SNode.C
Loading...
Searching...
No Matches
SocketConnector.hpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "core/State.h"
43#include "core/socket/stream/SocketConnector.h"
44
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
46
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
49
50#include <iomanip>
51#include <string>
52#include <utility>
53
54#endif // DOXYGEN_SHOULD_SKIP_THIS
55
56namespace core::socket::stream {
57
58 template <typename PhysicalSocketClient,
59 typename Config,
60 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
61 SocketConnector<PhysicalSocketClient, Config, SocketConnection>::SocketConnector(
62 const std::shared_ptr<SocketContextFactory>& socketContextFactory,
63 const std::function<void(SocketConnection*)>& onConnect,
64 const std::function<void(SocketConnection*)>& onConnected,
65 const std::function<void(SocketConnection*)>& onDisconnect,
66 const std::function<void(const SocketAddress&, core::socket::State)>& onStatus,
67 const std::shared_ptr<Config>& config)
68 : core::eventreceiver::ConnectEventReceiver(config->getInstanceName() + " SocketConnector", 0)
69 , socketContextFactory(socketContextFactory)
70 , onConnect(onConnect)
71 , onConnected(onConnected)
72 , onDisconnect(onDisconnect)
73 , onStatus(onStatus)
74 , config(config) {
75 atNextTick([this]() {
76 if (core::eventLoopState() == core::State::RUNNING) {
77 init();
78 } else {
80 }
81 });
82 }
84 template <typename PhysicalSocketServer,
85 typename Config,
86 template <typename ConfigT, typename PhysicalSocketServerT> typename SocketConnection>
87 SocketConnector<PhysicalSocketServer, Config, SocketConnection>::SocketConnector(const SocketConnector& socketConnector)
88 : core::eventreceiver::ConnectEventReceiver(socketConnector.config->getInstanceName() + " SocketConnector", 0)
90 , onConnect(socketConnector.onConnect)
91 , onConnected(socketConnector.onConnected)
92 , onDisconnect(socketConnector.onDisconnect)
93 , onStatus(socketConnector.onStatus)
94 , config(socketConnector.config) {
95 atNextTick([this]() {
96 if (core::eventLoopState() == core::State::RUNNING) {
98 } else {
100 }
101 });
102 }
103
104 template <typename PhysicalSocketClient,
105 typename Config,
106 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
107 SocketConnector<PhysicalSocketClient, Config, SocketConnection>::~SocketConnector() {
108 }
109
110 template <typename PhysicalSocketClient,
111 typename Config,
112 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
113 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::init() {
114 if (!config->getDisabled()) {
115 try {
116 LOG(TRACE) << config->getInstanceName() << " Starting";
117
118 remoteAddress = config->Remote::getSocketAddress();
119 SocketAddress localAddress = config->Local::getSocketAddress();
120
121 try {
122 core::socket::State state = core::socket::STATE_OK;
123
124 if (physicalClientSocket.open(config->getSocketOptions(), PhysicalClientSocket::Flags::NONBLOCK) < 0) {
125 switch (errno) {
126 case EMFILE:
127 case ENFILE:
128 case ENOBUFS:
129 case ENOMEM:
130 PLOG(DEBUG) << config->getInstanceName() << " open: '" << localAddress.toString() << "'";
131
132 state = core::socket::STATE_ERROR;
133 break;
134 default:
135 PLOG(DEBUG) << config->getInstanceName() << " open: '" << localAddress.toString() << "'";
136
137 state = core::socket::STATE_FATAL;
138 break;
139 }
140
142 } else if (physicalClientSocket.bind(localAddress) < 0) {
143 switch (errno) {
144 case EADDRINUSE:
145 PLOG(DEBUG) << config->getInstanceName() << " bind: '" << localAddress.toString() << "'";
146
147 state = core::socket::STATE_ERROR;
148 break;
149 default:
150 PLOG(DEBUG) << config->getInstanceName() << " bind: '" << localAddress.toString() << "'";
151
152 state = core::socket::STATE_FATAL;
153 break;
154 }
155
157 } else if (physicalClientSocket.connect(remoteAddress) < 0 && !PhysicalClientSocket::connectInProgress(errno)) {
158 switch (errno) {
159 case EADDRINUSE:
160 case EADDRNOTAVAIL:
161 case ECONNREFUSED:
162 case ENETUNREACH:
163 case ENOENT:
164 case EHOSTDOWN:
165 PLOG(DEBUG) << config->getInstanceName() << " connect: '" << remoteAddress.toString() << "'";
166
167 state = core::socket::STATE_ERROR;
168 break;
169 default:
170 PLOG(DEBUG) << config->getInstanceName() << " connect: '" << remoteAddress.toString() << "'";
171
172 state = core::socket::STATE_FATAL;
173 break;
174 }
175
176 SocketAddress currentRemoteAddress = remoteAddress;
177 if (remoteAddress.useNext()) {
178 onStatus(currentRemoteAddress, state | core::socket::State::NO_RETRY);
179
180 LOG(DEBUG) << config->getInstanceName() << " using next SocketAddress: '"
181 << config->Remote::getSocketAddress().toString() << "'";
182
184 } else {
185 onStatus(currentRemoteAddress, state);
186 }
187 } else if (PhysicalClientSocket::connectInProgress(errno)) {
188 if (enable(physicalClientSocket.getFd())) {
189 LOG(DEBUG) << config->getInstanceName() << " connect in progress: '" << remoteAddress.toString() << "'";
190 } else {
191 LOG(DEBUG) << config->getInstanceName() << " not enabled: '" << remoteAddress.toString() << "'";
192
193 state = core::socket::STATE(core::socket::STATE_FATAL, ECANCELED, "SocketConnector not enabled");
194
196 }
197 } else {
198 LOG(DEBUG) << config->getInstanceName() << " [" << physicalClientSocket.getFd() << "] connect success: '"
199 << remoteAddress.toString() << "'";
200
201 onStatus(remoteAddress, core::socket::STATE_OK);
202
203 SocketConnection* socketConnection = new SocketConnection(std::move(physicalClientSocket), onDisconnect, config);
204
205 onConnect(socketConnection);
206 onConnected(socketConnection);
207 }
208 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
209 LOG(DEBUG) << config->getInstanceName() << " " << badSocketAddress.what();
210
211 onStatus({}, core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what()));
212 }
213 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
214 LOG(DEBUG) << config->getInstanceName() << " " << badSocketAddress.what();
215
216 onStatus({}, core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what()));
217 }
218 } else {
219 LOG(DEBUG) << config->getInstanceName() << " disabled";
220
221 onStatus({}, core::socket::STATE_DISABLED);
222 }
223
224 if (isEnabled()) {
226 } else {
228 }
229 }
230
231 template <typename PhysicalSocketClient,
232 typename Config,
233 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
234 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::connectEvent() {
235 int cErrno = 0;
236
237 if (physicalClientSocket.getSockError(cErrno) == 0) { // == 0->return valid : < 0->getsockopt failed
238 const utils::PreserveErrno pe(cErrno); // errno = cErrno
239
240 if (errno == 0) {
241 LOG(DEBUG) << config->getInstanceName() << " [" << physicalClientSocket.getFd() << "] connect success: '"
242 << remoteAddress.toString() << "'";
243
244 onStatus(remoteAddress, core::socket::STATE_OK);
245
246 SocketConnection* socketConnection = new SocketConnection(std::move(physicalClientSocket), onDisconnect, config);
247
248 onConnect(socketConnection);
249 onConnected(socketConnection);
250
251 disable();
252 } else if (PhysicalClientSocket::connectInProgress(errno)) {
253 LOG(TRACE) << config->getInstanceName() << " connect still in progress: '" << remoteAddress.toString() << "'";
254 } else {
255 SocketAddress currentRemoteAddress = remoteAddress;
256 if (remoteAddress.useNext()) {
257 core::socket::State state = core::socket::STATE_OK;
258
259 switch (errno) {
260 case EADDRINUSE:
261 case EADDRNOTAVAIL:
262 case ECONNREFUSED:
263 case ENETUNREACH:
264 case ENOENT:
265 case EHOSTDOWN:
266 PLOG(DEBUG) << config->getInstanceName() << " connect: '" << remoteAddress.toString() << "'";
267
268 state = core::socket::STATE_ERROR;
269 break;
270 default:
271 PLOG(DEBUG) << config->getInstanceName() << ": connect: '" << remoteAddress.toString() << "'";
272
273 state = core::socket::STATE_FATAL;
274 break;
275 }
276
277 onStatus(currentRemoteAddress, (state | core::socket::State::NO_RETRY));
278
279 LOG(DEBUG) << config->getInstanceName() << " using next SocketAddress: '"
280 << config->Remote::getSocketAddress().toString() << "'";
281
283
284 disable();
285 } else {
286 core::socket::State state = core::socket::STATE_OK;
287
288 switch (errno) {
289 case EADDRINUSE:
290 case EADDRNOTAVAIL:
291 case ECONNREFUSED:
292 case ENETUNREACH:
293 case ENOENT:
294 case EHOSTDOWN:
295 PLOG(DEBUG) << config->getInstanceName() << " connect: '" << remoteAddress.toString() << "'";
296
297 state = core::socket::STATE_ERROR;
298 break;
299 default:
300 PLOG(DEBUG) << config->getInstanceName() << " connect: '" << remoteAddress.toString() << "'";
301
302 state = core::socket::STATE_FATAL;
303 break;
304 }
305
306 onStatus(currentRemoteAddress, state);
307
308 disable();
309 }
310 }
311 } else {
312 PLOG(DEBUG) << config->getInstanceName() << " getsockopt syscall error: '" << remoteAddress.toString() << "'";
313
314 onStatus(remoteAddress, core::socket::STATE_FATAL);
315 disable();
316 }
317 }
318
319 template <typename PhysicalSocketClient,
320 typename Config,
321 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
322 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::unobservedEvent() {
324 }
325
326 template <typename PhysicalSocketClient,
327 typename Config,
328 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
329 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::connectTimeout() {
330 LOG(TRACE) << config->getInstanceName() << " connect timeout " << remoteAddress.toString();
331
332 SocketAddress currentRemoteAddress = remoteAddress;
333 if (remoteAddress.useNext()) {
334 LOG(DEBUG) << config->getInstanceName() << " using next SocketAddress: '" << config->Remote::getSocketAddress().toString()
335 << "'";
336
338 } else {
339 LOG(DEBUG) << config->getInstanceName() << " connect timeout '" << remoteAddress.toString() << "'";
340 errno = ETIMEDOUT;
341
342 onStatus(currentRemoteAddress, core::socket::STATE_ERROR);
343 }
344
346 }
347
348 template <typename PhysicalSocketClient,
349 typename Config,
350 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
351 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::destruct() {
352 delete this;
353 }
354
355} // namespace core::socket::stream
void setTimeout(const utils::Timeval &timeout)
static void atNextTick(const std::function< void(void)> &callBack)
virtual bool onSignal(int sig)=0
virtual ~Socket()
Definition Socket.hpp:56
Config & getConfig() const
Definition Socket.hpp:60
std::shared_ptr< Config > config
Definition Socket.h:72
Socket(const std::string &name)
Definition Socket.hpp:51
static constexpr int NO_RETRY
Definition State.h:59
State operator|(int state)
Definition State.cpp:102
void onReceivedFromPeer(std::size_t available) final
bool streamToPeer(core::pipe::Source *source) final
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnectm, const std::shared_ptr< Config > &config)
std::size_t getTotalSent() const override
void shutdownWrite(bool forceClose) final
void doWriteShutdown(const std::function< void()> &onShutdown) override
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
void setTimeout(const utils::Timeval &timeout) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
std::size_t getTotalRead() const override
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
core::socket::stream::SocketContext * socketContext
SocketConnector(const SocketConnector &socketConnector)
std::function< void(const SocketAddress &, core::socket::State)> onStatus
std::function< void(SocketConnection *)> onConnected
std::shared_ptr< core::socket::stream::SocketContextFactory > socketContextFactory
std::function< void(SocketConnection *)> onDisconnect
std::function< void(SocketConnection *)> onConnect
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
void onReadError(int errnum) override
void onWriteError(int errnum) override
void readFromPeer(std::size_t available)
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
SocketConnector(const SocketConnector &socketConnector)
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
State
Definition State.h:51
State eventLoopState()
Definition State.cpp:52
std::string sigabbrev_np(int sig)
Definition signal.cpp:59