SNode.C
Loading...
Searching...
No Matches
SocketConnector.hpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "core/State.h"
43#include "core/socket/stream/SocketConnector.h"
44
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
46
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
49
50#include <iomanip>
51#include <string>
52#include <utility>
53
54#endif // DOXYGEN_SHOULD_SKIP_THIS
55
56namespace core::socket::stream {
57
58 template <typename PhysicalSocketClient,
59 typename Config,
60 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
61 SocketConnector<PhysicalSocketClient, Config, SocketConnection>::SocketConnector(
62 const std::function<void(SocketConnection*)>& onConnect,
63 const std::function<void(SocketConnection*)>& onConnected,
64 const std::function<void(SocketConnection*)>& onDisconnect,
65 const std::function<void(const SocketAddress&, core::socket::State)>& onStatus,
66 const std::shared_ptr<Config>& config)
67 : core::eventreceiver::ConnectEventReceiver(config->getInstanceName() + " SocketConnector", 0)
68 , onConnect(onConnect)
69 , onConnected(onConnected)
70 , onDisconnect(onDisconnect)
71 , onStatus(onStatus)
72 , config(config) {
73 atNextTick([this]() {
74 if (core::eventLoopState() == core::State::RUNNING) {
75 init();
76 } else {
78 }
79 });
80 }
81
82 template <typename PhysicalSocketServer,
83 typename Config,
84 template <typename ConfigT, typename PhysicalSocketServerT> typename SocketConnection>
85 SocketConnector<PhysicalSocketServer, Config, SocketConnection>::SocketConnector(const SocketConnector& socketConnector)
86 : core::eventreceiver::ConnectEventReceiver(socketConnector.config->getInstanceName() + " SocketConnector", 0)
87 , onConnect(socketConnector.onConnect)
88 , onConnected(socketConnector.onConnected)
89 , onDisconnect(socketConnector.onDisconnect)
90 , onStatus(socketConnector.onStatus)
91 , config(socketConnector.config) {
92 atNextTick([this]() {
93 if (core::eventLoopState() == core::State::RUNNING) {
94 init();
95 } else {
97 }
98 });
99 }
100
101 template <typename PhysicalSocketClient,
102 typename Config,
103 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
104 SocketConnector<PhysicalSocketClient, Config, SocketConnection>::~SocketConnector() {
105 }
106
107 template <typename PhysicalSocketClient,
108 typename Config,
109 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
110 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::init() {
111 if (!config->getDisabled()) {
112 try {
113 core::socket::State state = core::socket::STATE_OK;
114
115 LOG(TRACE) << config->getInstanceName() << " Starting";
116
117 SocketAddress bindAddress = config->Local::getSocketAddress();
118
119 try {
120 remoteAddress = config->Remote::getSocketAddress();
121
122 if (physicalClientSocket.open(config->getSocketOptions(), PhysicalClientSocket::Flags::NONBLOCK) < 0) {
123 PLOG(DEBUG) << config->getInstanceName() << " open " << bindAddress.toString();
124
125 switch (errno) {
126 case EMFILE:
127 case ENFILE:
128 case ENOBUFS:
129 case ENOMEM:
130 state = core::socket::STATE_ERROR;
131 break;
132 default:
133 state = core::socket::STATE_FATAL;
134 break;
135 }
136
137 onStatus(bindAddress, state);
138 } else {
139 LOG(TRACE) << config->getInstanceName() << " open " << bindAddress.toString() << ": success";
140
141 if (physicalClientSocket.bind(bindAddress) < 0) {
142 PLOG(DEBUG) << config->getInstanceName() << " bind " << bindAddress.toString();
143
144 switch (errno) {
145 case EADDRINUSE:
146 state = core::socket::STATE_ERROR;
147 break;
148 default:
149 state = core::socket::STATE_FATAL;
150 break;
151 }
152
153 onStatus(bindAddress, state);
154 } else {
155 LOG(TRACE) << config->getInstanceName() << " bind " << bindAddress.toString() << ": success";
156
157 if (physicalClientSocket.connect(remoteAddress) < 0 && !PhysicalClientSocket::connectInProgress(errno)) {
158 PLOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString();
159 switch (errno) {
160 case EADDRINUSE:
161 case EADDRNOTAVAIL:
162 case ECONNREFUSED:
163 case ENETUNREACH:
164 case ENOENT:
165 case EHOSTDOWN:
166 state = core::socket::STATE_ERROR;
167 break;
168 default:
169 state = core::socket::STATE_FATAL;
170 break;
171 }
172
173 SocketAddress currentRemoteAddress = remoteAddress;
174 if (remoteAddress.useNext()) {
175 onStatus(currentRemoteAddress, state | core::socket::State::NO_RETRY);
176
177 LOG(INFO) << config->getInstanceName() << ": Using next SocketAddress: " << remoteAddress.toString();
178
180 } else {
181 onStatus(currentRemoteAddress, state);
182 }
183 } else {
184 LOG(TRACE) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": success";
185
186 if (PhysicalClientSocket::connectInProgress(errno)) {
187 if (enable(physicalClientSocket.getFd())) {
188 LOG(DEBUG)
189 << config->getInstanceName() << " enable " << remoteAddress.toString(false) << ": success";
190 } else {
191 LOG(ERROR) << config->getInstanceName() << " enable " << remoteAddress.toString()
192 << ": failed. No valid descriptor created";
193
194 state = core::socket::STATE(core::socket::STATE_FATAL, ECANCELED, "SocketConnector not enabled");
195
197 }
198 } else {
199 SocketConnection* socketConnection =
201
202 LOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": success";
203 LOG(DEBUG) << " " << socketConnection->getLocalAddress().toString() << " -> "
204 << socketConnection->getRemoteAddress().toString();
205
207
208 onConnect(socketConnection);
209 onConnected(socketConnection);
210 }
211 }
212 }
213 }
214 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
215 core::socket::State state =
216 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
217
218 LOG(ERROR) << state.what();
219
220 onStatus({}, state);
221 }
222 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
223 core::socket::State state =
224 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
225
226 LOG(ERROR) << state.what();
227
228 onStatus({}, state);
229 }
230 } else {
231 LOG(DEBUG) << config->getInstanceName() << ": disabled";
232
233 onStatus({}, core::socket::STATE_DISABLED);
234 }
235
236 if (isEnabled()) {
238 } else {
240 }
241 }
242
243 template <typename PhysicalSocketClient,
244 typename Config,
245 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
246 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::connectEvent() {
247 int cErrno = 0;
248
249 if (physicalClientSocket.getSockError(cErrno) == 0) { // == 0->return valid : < 0->getsockopt failed
250 const utils::PreserveErrno pe(cErrno); // errno = cErrno
251
252 if (errno == 0) {
253 SocketConnection* socketConnection = new SocketConnection(std::move(physicalClientSocket), onDisconnect, config);
254
255 LOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": success";
256 LOG(DEBUG) << " " << socketConnection->getLocalAddress().toString() << " -> "
257 << socketConnection->getRemoteAddress().toString();
258
259 onStatus(remoteAddress, core::socket::STATE_OK);
260
261 onConnect(socketConnection);
262 onConnected(socketConnection);
263
264 disable();
265 } else if (PhysicalClientSocket::connectInProgress(errno)) {
266 LOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": in progress:";
267 } else {
268 SocketAddress currentRemoteAddress = remoteAddress;
269
270 core::socket::State state = core::socket::STATE_OK;
271
272 switch (errno) {
273 case EADDRINUSE:
274 case EADDRNOTAVAIL:
275 case ECONNREFUSED:
276 case ENETUNREACH:
277 case ENOENT:
278 case EHOSTDOWN:
279 state = core::socket::STATE_ERROR;
280 break;
281 default:
282 state = core::socket::STATE_FATAL;
283 break;
284 }
285
286 if (remoteAddress.useNext()) {
287 PLOG(DEBUG) << config->getInstanceName() << " connect '" << remoteAddress.toString();
288
289 onStatus(currentRemoteAddress, (state | core::socket::State::NO_RETRY));
290
291 LOG(DEBUG) << config->getInstanceName()
292 << " using next SocketAddress: " << config->Remote::getSocketAddress().toString();
293
295
296 disable();
297 } else {
298 PLOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString();
299
300 onStatus(currentRemoteAddress, state);
301
302 disable();
303 }
304 }
305 } else {
306 PLOG(DEBUG) << config->getInstanceName() << " getsockopt syscall error: '" << remoteAddress.toString() << "'";
307
308 onStatus(remoteAddress, core::socket::STATE_FATAL);
309 disable();
310 }
311 }
312
313 template <typename PhysicalSocketClient,
314 typename Config,
315 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
316 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::unobservedEvent() {
318 }
319
320 template <typename PhysicalSocketClient,
321 typename Config,
322 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
323 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::connectTimeout() {
324 LOG(TRACE) << config->getInstanceName() << " connect timeout " << remoteAddress.toString();
325
326 SocketAddress currentRemoteAddress = remoteAddress;
327 if (remoteAddress.useNext()) {
328 LOG(DEBUG) << config->getInstanceName() << " using next SocketAddress: '" << config->Remote::getSocketAddress().toString()
329 << "'";
330
332 } else {
333 LOG(DEBUG) << config->getInstanceName() << " connect timeout '" << remoteAddress.toString() << "'";
334 errno = ETIMEDOUT;
335
336 onStatus(currentRemoteAddress, core::socket::STATE_ERROR);
337 }
338
340 }
341
342 template <typename PhysicalSocketClient,
343 typename Config,
344 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
345 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::destruct() {
346 delete this;
347 }
348
349} // namespace core::socket::stream
void setTimeout(const utils::Timeval &timeout)
static void atNextTick(const std::function< void(void)> &callBack)
virtual bool onSignal(int sig)=0
virtual ~Socket()
Definition Socket.hpp:61
Socket(const std::shared_ptr< Config > &config)
Definition Socket.hpp:56
Config & getConfig() const
Definition Socket.hpp:65
std::shared_ptr< Config > config
Definition Socket.h:75
Socket(const std::string &name)
Definition Socket.hpp:51
std::string what() const
Definition State.cpp:114
static constexpr int NO_RETRY
Definition State.h:59
State operator|(int state)
Definition State.cpp:102
void onReceivedFromPeer(std::size_t available) final
void setReadTimeout(const utils::Timeval &timeout) final
bool streamToPeer(core::pipe::Source *source) final
std::size_t getTotalSent() const override
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnection Super
const SocketAddress & getBindAddress() const final
void shutdownWrite(bool forceClose) final
void doWriteShutdown(const std::function< void()> &onShutdown) override
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
void setWriteTimeout(const utils::Timeval &timeout) final
void setTimeout(const utils::Timeval &timeout) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
std::size_t getTotalRead() const override
typename PhysicalSocket::SocketAddress SocketAddress
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
core::socket::stream::SocketContext * newSocketContext
core::socket::stream::SocketContext * socketContext
const std::string & getConnectionName() const
typename PhysicalClientSocket::SocketAddress SocketAddress
SocketConnector(const SocketConnector &socketConnector)
std::function< void(const SocketAddress &, core::socket::State)> onStatus
SocketConnectionT< PhysicalClientSocket, Config > SocketConnection
std::function< void(SocketConnection *)> onConnected
std::function< void(SocketConnection *)> onDisconnect
PhysicalSocketClientT PhysicalClientSocket
std::function< void(SocketConnection *)> onConnect
SocketConnector(const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
void onReadError(int errnum) override
void onWriteError(int errnum) override
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnectionT< PhysicalSocketT, core::socket::stream::legacy::SocketReader, core::socket::stream::legacy::SocketWriter, ConfigT > Super
typename Super::SocketAddress SocketAddress
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnector< PhysicalClientSocketT, ConfigT, core::socket::stream::legacy::SocketConnection > Super
SocketConnector(const SocketConnector &socketConnector)
typename Super::SocketConnection SocketConnection
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
State
Definition State.h:51
@ RUNNING
Definition State.h:51
State eventLoopState()
Definition State.cpp:52
std::string sigabbrev_np(int sig)
Definition signal.cpp:59