SNode.C
Loading...
Searching...
No Matches
SocketConnector.hpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "core/State.h"
43#include "core/socket/stream/SocketConnector.h"
44
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
46
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
49
50#include <iomanip>
51#include <string>
52#include <utility>
53
54#endif // DOXYGEN_SHOULD_SKIP_THIS
55
56namespace core::socket::stream {
57
58 template <typename PhysicalSocketClient,
59 typename Config,
60 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
61 SocketConnector<PhysicalSocketClient, Config, SocketConnection>::SocketConnector(
62 const std::shared_ptr<SocketContextFactory>& socketContextFactory,
63 const std::function<void(SocketConnection*)>& onConnect,
64 const std::function<void(SocketConnection*)>& onConnected,
65 const std::function<void(SocketConnection*)>& onDisconnect,
66 const std::function<void(const SocketAddress&, core::socket::State)>& onStatus,
67 const std::shared_ptr<Config>& config)
68 : core::eventreceiver::ConnectEventReceiver(config->getInstanceName() + " SocketConnector", 0)
69 , socketContextFactory(socketContextFactory)
70 , onConnect(onConnect)
71 , onConnected(onConnected)
72 , onDisconnect(onDisconnect)
73 , onStatus(onStatus)
74 , config(config) {
75 atNextTick([this]() {
76 if (core::eventLoopState() == core::State::RUNNING) {
77 init();
78 } else {
80 }
81 });
82 }
84 template <typename PhysicalSocketServer,
85 typename Config,
86 template <typename ConfigT, typename PhysicalSocketServerT> typename SocketConnection>
87 SocketConnector<PhysicalSocketServer, Config, SocketConnection>::SocketConnector(const SocketConnector& socketConnector)
88 : core::eventreceiver::ConnectEventReceiver(socketConnector.config->getInstanceName() + " SocketConnector", 0)
90 , onConnect(socketConnector.onConnect)
91 , onConnected(socketConnector.onConnected)
92 , onDisconnect(socketConnector.onDisconnect)
93 , onStatus(socketConnector.onStatus)
94 , config(socketConnector.config) {
95 atNextTick([this]() {
96 if (core::eventLoopState() == core::State::RUNNING) {
98 } else {
100 }
101 });
102 }
103
104 template <typename PhysicalSocketClient,
105 typename Config,
106 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
107 SocketConnector<PhysicalSocketClient, Config, SocketConnection>::~SocketConnector() {
108 }
109
110 template <typename PhysicalSocketClient,
111 typename Config,
112 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
113 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::init() {
114 if (!config->getDisabled()) {
115 try {
116 core::socket::State state = core::socket::STATE_OK;
117
118 LOG(TRACE) << config->getInstanceName() << " Starting";
119
120 SocketAddress bindAddress = config->Local::getSocketAddress();
121
122 try {
123 remoteAddress = config->Remote::getSocketAddress();
124
125 if (physicalClientSocket.open(config->getSocketOptions(), PhysicalClientSocket::Flags::NONBLOCK) < 0) {
126 PLOG(DEBUG) << config->getInstanceName() << " open " << bindAddress.toString();
127
128 switch (errno) {
129 case EMFILE:
130 case ENFILE:
131 case ENOBUFS:
132 case ENOMEM:
133 state = core::socket::STATE_ERROR;
134 break;
135 default:
136 state = core::socket::STATE_FATAL;
137 break;
138 }
139
140 onStatus(bindAddress, state);
141 } else {
142 LOG(TRACE) << config->getInstanceName() << " open " << bindAddress.toString() << ": success";
143
144 if (physicalClientSocket.bind(bindAddress) < 0) {
145 PLOG(DEBUG) << config->getInstanceName() << " bind " << bindAddress.toString();
146
147 switch (errno) {
148 case EADDRINUSE:
149 state = core::socket::STATE_ERROR;
150 break;
151 default:
152 state = core::socket::STATE_FATAL;
153 break;
154 }
155
156 onStatus(bindAddress, state);
157 } else {
158 LOG(TRACE) << config->getInstanceName() << " bind " << bindAddress.toString() << ": success";
159
160 if (physicalClientSocket.connect(remoteAddress) < 0 && !PhysicalClientSocket::connectInProgress(errno)) {
161 PLOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString();
162 switch (errno) {
163 case EADDRINUSE:
164 case EADDRNOTAVAIL:
165 case ECONNREFUSED:
166 case ENETUNREACH:
167 case ENOENT:
168 case EHOSTDOWN:
169 state = core::socket::STATE_ERROR;
170 break;
171 default:
172 state = core::socket::STATE_FATAL;
173 break;
174 }
175
176 SocketAddress currentRemoteAddress = remoteAddress;
177 if (remoteAddress.useNext()) {
178 onStatus(currentRemoteAddress, state | core::socket::State::NO_RETRY);
179
180 LOG(DEBUG) << config->getInstanceName() << " using next SocketAddress: " << remoteAddress.toString();
181
183 } else {
184 onStatus(currentRemoteAddress, state);
185 }
186 } else {
187 LOG(TRACE) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": success";
188
189 if (PhysicalClientSocket::connectInProgress(errno)) {
190 if (enable(physicalClientSocket.getFd())) {
191 LOG(DEBUG)
192 << config->getInstanceName() << " enable " << remoteAddress.toString(false) << ": success";
193 } else {
194 LOG(ERROR) << config->getInstanceName() << " enable " << remoteAddress.toString()
195 << ": failed. No valid descriptor created";
196
197 state = core::socket::STATE(core::socket::STATE_FATAL, ECANCELED, "SocketConnector not enabled");
198
200 }
201 } else {
202 SocketConnection* socketConnection =
203 new SocketConnection(std::move(physicalClientSocket), onDisconnect, config);
204
205 LOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": success";
206 LOG(DEBUG) << " " << socketConnection->getLocalAddress().toString() << " -> "
207 << socketConnection->getRemoteAddress().toString();
208
210
211 onConnect(socketConnection);
212 onConnected(socketConnection);
213 }
214 }
215 }
216 }
217 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
218 core::socket::State state =
219 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
220
221 LOG(ERROR) << state.what();
222
223 onStatus({}, state);
224 }
225 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
226 core::socket::State state =
227 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
228
229 LOG(ERROR) << state.what();
230
231 onStatus({}, state);
232 }
233 } else {
234 LOG(DEBUG) << config->getInstanceName() << ": disabled";
235
236 onStatus({}, core::socket::STATE_DISABLED);
237 }
238
239 if (isEnabled()) {
241 } else {
243 }
244 }
245
246 template <typename PhysicalSocketClient,
247 typename Config,
248 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
249 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::connectEvent() {
250 int cErrno = 0;
251
252 if (physicalClientSocket.getSockError(cErrno) == 0) { // == 0->return valid : < 0->getsockopt failed
253 const utils::PreserveErrno pe(cErrno); // errno = cErrno
254
255 if (errno == 0) {
256 SocketConnection* socketConnection = new SocketConnection(std::move(physicalClientSocket), onDisconnect, config);
257
258 LOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": success";
259 LOG(DEBUG) << " " << socketConnection->getLocalAddress().toString() << " -> "
260 << socketConnection->getRemoteAddress().toString();
261
262 onStatus(remoteAddress, core::socket::STATE_OK);
263
264 onConnect(socketConnection);
265 onConnected(socketConnection);
266
267 disable();
268 } else if (PhysicalClientSocket::connectInProgress(errno)) {
269 LOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": in progress:";
270 } else {
271 SocketAddress currentRemoteAddress = remoteAddress;
272
273 core::socket::State state = core::socket::STATE_OK;
274
275 switch (errno) {
276 case EADDRINUSE:
277 case EADDRNOTAVAIL:
278 case ECONNREFUSED:
279 case ENETUNREACH:
280 case ENOENT:
281 case EHOSTDOWN:
282 state = core::socket::STATE_ERROR;
283 break;
284 default:
285 state = core::socket::STATE_FATAL;
286 break;
287 }
288
289 if (remoteAddress.useNext()) {
290 PLOG(DEBUG) << config->getInstanceName() << " connect '" << remoteAddress.toString();
291
292 onStatus(currentRemoteAddress, (state | core::socket::State::NO_RETRY));
293
294 LOG(DEBUG) << config->getInstanceName()
295 << " using next SocketAddress: " << config->Remote::getSocketAddress().toString();
296
298
299 disable();
300 } else {
301 PLOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString();
302
303 onStatus(currentRemoteAddress, state);
304
305 disable();
306 }
307 }
308 } else {
309 PLOG(DEBUG) << config->getInstanceName() << " getsockopt syscall error: '" << remoteAddress.toString() << "'";
310
311 onStatus(remoteAddress, core::socket::STATE_FATAL);
312 disable();
313 }
314 }
315
316 template <typename PhysicalSocketClient,
317 typename Config,
318 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
319 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::unobservedEvent() {
321 }
322
323 template <typename PhysicalSocketClient,
324 typename Config,
325 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
326 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::connectTimeout() {
327 LOG(TRACE) << config->getInstanceName() << " connect timeout " << remoteAddress.toString();
328
329 SocketAddress currentRemoteAddress = remoteAddress;
330 if (remoteAddress.useNext()) {
331 LOG(DEBUG) << config->getInstanceName() << " using next SocketAddress: '" << config->Remote::getSocketAddress().toString()
332 << "'";
333
335 } else {
336 LOG(DEBUG) << config->getInstanceName() << " connect timeout '" << remoteAddress.toString() << "'";
337 errno = ETIMEDOUT;
338
339 onStatus(currentRemoteAddress, core::socket::STATE_ERROR);
340 }
341
343 }
344
345 template <typename PhysicalSocketClient,
346 typename Config,
347 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
348 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::destruct() {
349 delete this;
350 }
351
352} // namespace core::socket::stream
void setTimeout(const utils::Timeval &timeout)
static void atNextTick(const std::function< void(void)> &callBack)
virtual bool onSignal(int sig)=0
virtual ~Socket()
Definition Socket.hpp:61
Socket(const std::shared_ptr< Config > &config)
Definition Socket.hpp:56
Config & getConfig() const
Definition Socket.hpp:65
std::shared_ptr< Config > config
Definition Socket.h:75
Socket(const std::string &name)
Definition Socket.hpp:51
std::string what() const
Definition State.cpp:114
static constexpr int NO_RETRY
Definition State.h:59
State operator|(int state)
Definition State.cpp:102
void onReceivedFromPeer(std::size_t available) final
void setReadTimeout(const utils::Timeval &timeout) final
bool streamToPeer(core::pipe::Source *source) final
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnectm, const std::shared_ptr< Config > &config)
std::size_t getTotalSent() const override
const SocketAddress & getBindAddress() const final
void shutdownWrite(bool forceClose) final
void doWriteShutdown(const std::function< void()> &onShutdown) override
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
void setWriteTimeout(const utils::Timeval &timeout) final
void setTimeout(const utils::Timeval &timeout) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
std::size_t getTotalRead() const override
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
core::socket::stream::SocketContext * socketContext
SocketConnector(const SocketConnector &socketConnector)
std::function< void(const SocketAddress &, core::socket::State)> onStatus
std::function< void(SocketConnection *)> onConnected
std::shared_ptr< core::socket::stream::SocketContextFactory > socketContextFactory
std::function< void(SocketConnection *)> onDisconnect
std::function< void(SocketConnection *)> onConnect
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
void onReadError(int errnum) override
void onWriteError(int errnum) override
void readFromPeer(std::size_t available)
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
SocketConnector(const SocketConnector &socketConnector)
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
State
Definition State.h:51
State eventLoopState()
Definition State.cpp:52
std::string sigabbrev_np(int sig)
Definition signal.cpp:59