SNode.C
Loading...
Searching...
No Matches
SocketConnector.hpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025, 2026
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "core/State.h"
43#include "core/socket/stream/SocketConnector.h"
44
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
46
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
49
50#include <iomanip>
51#include <string>
52#include <utility>
53
54#endif // DOXYGEN_SHOULD_SKIP_THIS
55
56namespace core::socket::stream {
57
58 template <typename PhysicalSocketClient,
59 typename Config,
60 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
61 SocketConnector<PhysicalSocketClient, Config, SocketConnection>::SocketConnector(
62 const std::function<void(SocketConnection*)>& onConnect,
63 const std::function<void(SocketConnection*)>& onConnected,
64 const std::function<void(SocketConnection*)>& onDisconnect,
65 const std::function<void(core::eventreceiver::ConnectEventReceiver*)>& onInitState,
66 const std::function<void(const SocketAddress&, core::socket::State)>& onStatus,
67 const std::shared_ptr<Config>& config)
68 : core::eventreceiver::ConnectEventReceiver(config->getInstanceName() + " SocketConnector", 0)
69 , onConnect(onConnect)
70 , onConnected(onConnected)
71 , onDisconnect(onDisconnect)
72 , onInitState(onInitState)
73 , onStatus(onStatus)
74 , config(config) {
75 }
76
77 template <typename PhysicalSocketServer,
78 typename Config,
79 template <typename ConfigT, typename PhysicalSocketServerT> typename SocketConnection>
80 SocketConnector<PhysicalSocketServer, Config, SocketConnection>::SocketConnector(const SocketConnector& socketConnector)
81 : core::eventreceiver::ConnectEventReceiver(socketConnector.config->getInstanceName() + " SocketConnector", 0)
82 , onConnect(socketConnector.onConnect)
83 , onConnected(socketConnector.onConnected)
84 , onDisconnect(socketConnector.onDisconnect)
85 , onInitState(socketConnector.onInitState)
86 , onStatus(socketConnector.onStatus)
87 , config(socketConnector.config) {
88 }
89
90 template <typename PhysicalSocketClient,
91 typename Config,
92 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
93 SocketConnector<PhysicalSocketClient, Config, SocketConnection>::~SocketConnector() {
94 }
95
96 template <typename PhysicalSocketClient,
97 typename Config,
98 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
99 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::init() {
100 if (!config->getDisabled()) {
101 try {
102 core::socket::State state = core::socket::STATE_OK;
103
104 LOG(DEBUG) << config->getInstanceName() << " Connect: starting";
105
106 SocketAddress bindAddress = config->Local::getSocketAddress();
107
108 try {
109 remoteAddress = config->Remote::getSocketAddress();
110
111 if (physicalClientSocket.open(config->getSocketOptions(), PhysicalClientSocket::Flags::NONBLOCK) < 0) {
112 PLOG(DEBUG) << config->getInstanceName() << " open " << bindAddress.toString();
113
114 switch (errno) {
115 case EMFILE:
116 case ENFILE:
117 case ENOBUFS:
118 case ENOMEM:
119 state = core::socket::STATE_ERROR;
120 break;
121 default:
122 state = core::socket::STATE_FATAL;
123 break;
124 }
125
126 onStatus(bindAddress, state);
127 } else {
128 LOG(TRACE) << config->getInstanceName() << " open " << bindAddress.toString() << ": success";
129
130 if (physicalClientSocket.bind(bindAddress) < 0) {
131 PLOG(DEBUG) << config->getInstanceName() << " bind " << bindAddress.toString();
132
133 switch (errno) {
134 case EADDRINUSE:
135 state = core::socket::STATE_ERROR;
136 break;
137 default:
138 state = core::socket::STATE_FATAL;
139 break;
140 }
141
142 onStatus(bindAddress, state);
143 } else {
144 LOG(TRACE) << config->getInstanceName() << " bind " << bindAddress.toString() << ": success";
145
146 if (physicalClientSocket.connect(remoteAddress) < 0 && !PhysicalClientSocket::connectInProgress(errno)) {
147 PLOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString();
148 switch (errno) {
149 case EADDRINUSE:
150 case EADDRNOTAVAIL:
151 case ECONNREFUSED:
152 case ENETUNREACH:
153 case ENOENT:
154 case EHOSTDOWN:
155 state = core::socket::STATE_ERROR;
156 break;
157 default:
158 state = core::socket::STATE_FATAL;
159 break;
160 }
161
162 SocketAddress currentRemoteAddress = remoteAddress;
163 if (remoteAddress.useNext()) {
164 onStatus(currentRemoteAddress, state | core::socket::State::NO_RETRY);
165
166 LOG(INFO) << config->getInstanceName() << ": Using next SocketAddress: " << remoteAddress.toString();
167
169 } else {
170 onStatus(currentRemoteAddress, state);
171 }
172 } else {
173 LOG(TRACE) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": success";
174
175 if (PhysicalClientSocket::connectInProgress(errno)) {
176 if (enable(physicalClientSocket.getFd())) {
177 LOG(DEBUG)
178 << config->getInstanceName() << " enable " << remoteAddress.toString(false) << ": success";
179 } else {
180 LOG(ERROR) << config->getInstanceName() << " enable " << remoteAddress.toString()
181 << ": failed. No valid descriptor created";
182
183 state = core::socket::STATE(core::socket::STATE_FATAL, ECANCELED, "SocketConnector not enabled");
184
186 }
187 } else {
188 SocketConnection* socketConnection =
190
191 LOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": success";
192 LOG(DEBUG) << " " << socketConnection->getLocalAddress().toString() << " -> "
193 << socketConnection->getRemoteAddress().toString();
194
196
197 onConnect(socketConnection);
198 onConnected(socketConnection);
199 }
200 }
201 }
202 }
203 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
204 core::socket::State state =
205 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
206
207 LOG(ERROR) << state.what();
208
209 onStatus({}, state);
210 }
211 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
212 core::socket::State state =
213 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
214
215 LOG(ERROR) << state.what();
216
217 onStatus({}, state);
218 }
219 } else {
220 LOG(DEBUG) << config->getInstanceName() << ": disabled";
221
222 onStatus({}, core::socket::STATE_DISABLED);
223 }
224
225 if (isEnabled()) {
226 onInitState(this);
228 } else {
230 }
231 }
232
233 template <typename PhysicalSocketClient,
234 typename Config,
235 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
236 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::connectEvent() {
237 int cErrno = 0;
238
239 if (isEnabled() && physicalClientSocket.getSockError(cErrno) == 0) { // == 0->return valid : < 0->getsockopt failed
240 const utils::PreserveErrno pe(cErrno); // errno = cErrno
241
242 if (errno == 0) {
243 SocketConnection* socketConnection = new SocketConnection(std::move(physicalClientSocket), onDisconnect, config);
244
245 LOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": success";
246 LOG(DEBUG) << " " << socketConnection->getLocalAddress().toString() << " -> "
247 << socketConnection->getRemoteAddress().toString();
248
249 onStatus(remoteAddress, core::socket::STATE_OK);
250
251 onConnect(socketConnection);
252 onConnected(socketConnection);
253
254 disable();
255 } else if (PhysicalClientSocket::connectInProgress(errno)) {
256 LOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": in progress:";
257 } else {
258 SocketAddress currentRemoteAddress = remoteAddress;
259
260 core::socket::State state = core::socket::STATE_OK;
261
262 switch (errno) {
263 case EADDRINUSE:
264 case EADDRNOTAVAIL:
265 case ECONNREFUSED:
266 case ENETUNREACH:
267 case ENOENT:
268 case EHOSTDOWN:
269 state = core::socket::STATE_ERROR;
270 break;
271 default:
272 state = core::socket::STATE_FATAL;
273 break;
274 }
275
276 if (remoteAddress.useNext()) {
277 PLOG(DEBUG) << config->getInstanceName() << " connect '" << remoteAddress.toString();
278
279 onStatus(currentRemoteAddress, (state | core::socket::State::NO_RETRY));
280
281 LOG(DEBUG) << config->getInstanceName()
282 << " using next SocketAddress: " << config->Remote::getSocketAddress().toString();
283
285
286 disable();
287 } else {
288 PLOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString();
289
290 onStatus(currentRemoteAddress, state);
291
292 disable();
293 }
294 }
295 } else {
296 PLOG(DEBUG) << config->getInstanceName() << " getsockopt syscall error: '" << remoteAddress.toString() << "'";
297
298 onStatus(remoteAddress, core::socket::STATE_FATAL);
299 disable();
300 }
301 }
302
303 template <typename PhysicalSocketClient,
304 typename Config,
305 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
306 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::unobservedEvent() {
308 }
309
310 template <typename PhysicalSocketClient,
311 typename Config,
312 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
313 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::connectTimeout() {
314 LOG(TRACE) << config->getInstanceName() << " connect timeout " << remoteAddress.toString();
315
316 SocketAddress currentRemoteAddress = remoteAddress;
317 if (remoteAddress.useNext()) {
318 LOG(DEBUG) << config->getInstanceName() << " using next SocketAddress: '" << config->Remote::getSocketAddress().toString()
319 << "'";
320
322 } else {
323 LOG(DEBUG) << config->getInstanceName() << " connect timeout '" << remoteAddress.toString() << "'";
324 errno = ETIMEDOUT;
325
326 onStatus(currentRemoteAddress, core::socket::STATE_ERROR);
327 }
328
330 }
331
332 template <typename PhysicalSocketClient,
333 typename Config,
334 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
335 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::destruct() {
336 if (!config->getDisabled()) {
337 onInitState(this);
338 }
339
340 delete this;
341 }
342
343} // namespace core::socket::stream
void setTimeout(const utils::Timeval &timeout)
virtual bool onSignal(int sig)=0
virtual ~Socket()
Definition Socket.hpp:61
const std::shared_ptr< Config > config
Definition Socket.h:75
Socket(const std::shared_ptr< Config > &config)
Definition Socket.hpp:56
Config & getConfig() const
Definition Socket.hpp:65
Socket(const std::string &name)
Definition Socket.hpp:51
std::string what() const
Definition State.cpp:114
static constexpr int NO_RETRY
Definition State.h:59
State operator|(int state)
Definition State.cpp:102
void onReceivedFromPeer(std::size_t available) final
void setReadTimeout(const utils::Timeval &timeout) final
bool streamToPeer(core::pipe::Source *source) final
std::size_t getTotalSent() const override
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnection Super
const SocketAddress & getBindAddress() const final
void doWriteShutdown(const std::function< void()> &onShutdown) override
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
void setWriteTimeout(const utils::Timeval &timeout) final
void setTimeout(const utils::Timeval &timeout) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
std::size_t getTotalRead() const override
typename PhysicalSocket::SocketAddress SocketAddress
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
core::socket::stream::SocketContext * newSocketContext
core::socket::stream::SocketContext * socketContext
const std::string & getConnectionName() const
SocketConnector(const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(core::eventreceiver::ConnectEventReceiver *)> &onInitState, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
typename PhysicalClientSocket::SocketAddress SocketAddress
SocketConnector(const SocketConnector &socketConnector)
std::function< void(const SocketAddress &, core::socket::State)> onStatus
SocketConnectionT< PhysicalClientSocket, Config > SocketConnection
std::function< void(SocketConnection *)> onConnected
std::function< void(SocketConnection *)> onDisconnect
PhysicalSocketClientT PhysicalClientSocket
std::function< void(core::eventreceiver::ConnectEventReceiver *)> onInitState
std::function< void(SocketConnection *)> onConnect
void onReadError(int errnum) override
void onWriteError(int errnum) override
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnectionT< PhysicalSocketT, core::socket::stream::legacy::SocketReader, core::socket::stream::legacy::SocketWriter, ConfigT > Super
typename Super::SocketAddress SocketAddress
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(core::eventreceiver::ConnectEventReceiver *)> &onInitState, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnector< PhysicalClientSocketT, ConfigT, core::socket::stream::legacy::SocketConnection > Super
SocketConnector(const SocketConnector &socketConnector)
typename Super::SocketConnection SocketConnection
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
State
Definition State.h:51
@ RUNNING
Definition State.h:51
State eventLoopState()
Definition State.cpp:52
std::string sigabbrev_np(int sig)
Definition signal.cpp:59