SNode.C
Loading...
Searching...
No Matches
SocketConnector.hpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "core/State.h"
43#include "core/socket/stream/SocketConnector.h"
44
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
46
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
49
50#include <iomanip>
51#include <string>
52#include <utility>
53
54#endif // DOXYGEN_SHOULD_SKIP_THIS
55
56namespace core::socket::stream {
57
58 template <typename PhysicalSocketClient,
59 typename Config,
60 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
61 SocketConnector<PhysicalSocketClient, Config, SocketConnection>::SocketConnector(
62 const std::function<void(SocketConnection*)>& onConnect,
63 const std::function<void(SocketConnection*)>& onConnected,
64 const std::function<void(SocketConnection*)>& onDisconnect,
65 const std::function<void(const SocketAddress&, core::socket::State)>& onStatus,
66 const std::shared_ptr<Config>& config)
67 : core::eventreceiver::ConnectEventReceiver(config->getInstanceName() + " SocketConnector", 0)
68 , onConnect(onConnect)
69 , onConnected(onConnected)
70 , onDisconnect(onDisconnect)
71 , onStatus(onStatus)
72 , config(config) {
73 }
74
75 template <typename PhysicalSocketServer,
76 typename Config,
77 template <typename ConfigT, typename PhysicalSocketServerT> typename SocketConnection>
78 SocketConnector<PhysicalSocketServer, Config, SocketConnection>::SocketConnector(const SocketConnector& socketConnector)
79 : core::eventreceiver::ConnectEventReceiver(socketConnector.config->getInstanceName() + " SocketConnector", 0)
80 , onConnect(socketConnector.onConnect)
81 , onConnected(socketConnector.onConnected)
82 , onDisconnect(socketConnector.onDisconnect)
83 , onStatus(socketConnector.onStatus)
84 , config(socketConnector.config) {
85 }
86
87 template <typename PhysicalSocketClient,
88 typename Config,
89 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
90 SocketConnector<PhysicalSocketClient, Config, SocketConnection>::~SocketConnector() {
91 }
92
93 template <typename PhysicalSocketClient,
94 typename Config,
95 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
96 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::init() {
97 if (!config->getDisabled()) {
98 try {
99 core::socket::State state = core::socket::STATE_OK;
100
101 LOG(DEBUG) << config->getInstanceName() << " Connect: starting";
102
103 SocketAddress bindAddress = config->Local::getSocketAddress();
104
105 try {
106 remoteAddress = config->Remote::getSocketAddress();
107
108 if (physicalClientSocket.open(config->getSocketOptions(), PhysicalClientSocket::Flags::NONBLOCK) < 0) {
109 PLOG(DEBUG) << config->getInstanceName() << " open " << bindAddress.toString();
110
111 switch (errno) {
112 case EMFILE:
113 case ENFILE:
114 case ENOBUFS:
115 case ENOMEM:
116 state = core::socket::STATE_ERROR;
117 break;
118 default:
119 state = core::socket::STATE_FATAL;
120 break;
121 }
122
123 onStatus(bindAddress, state);
124 } else {
125 LOG(TRACE) << config->getInstanceName() << " open " << bindAddress.toString() << ": success";
126
127 if (physicalClientSocket.bind(bindAddress) < 0) {
128 PLOG(DEBUG) << config->getInstanceName() << " bind " << bindAddress.toString();
129
130 switch (errno) {
131 case EADDRINUSE:
132 state = core::socket::STATE_ERROR;
133 break;
134 default:
135 state = core::socket::STATE_FATAL;
136 break;
137 }
138
139 onStatus(bindAddress, state);
140 } else {
141 LOG(TRACE) << config->getInstanceName() << " bind " << bindAddress.toString() << ": success";
142
143 if (physicalClientSocket.connect(remoteAddress) < 0 && !PhysicalClientSocket::connectInProgress(errno)) {
144 PLOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString();
145 switch (errno) {
146 case EADDRINUSE:
147 case EADDRNOTAVAIL:
148 case ECONNREFUSED:
149 case ENETUNREACH:
150 case ENOENT:
151 case EHOSTDOWN:
152 state = core::socket::STATE_ERROR;
153 break;
154 default:
155 state = core::socket::STATE_FATAL;
156 break;
157 }
158
159 SocketAddress currentRemoteAddress = remoteAddress;
160 if (remoteAddress.useNext()) {
161 onStatus(currentRemoteAddress, state | core::socket::State::NO_RETRY);
162
163 LOG(INFO) << config->getInstanceName() << ": Using next SocketAddress: " << remoteAddress.toString();
164
166 } else {
167 onStatus(currentRemoteAddress, state);
168 }
169 } else {
170 LOG(TRACE) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": success";
171
172 if (PhysicalClientSocket::connectInProgress(errno)) {
173 if (enable(physicalClientSocket.getFd())) {
174 LOG(DEBUG)
175 << config->getInstanceName() << " enable " << remoteAddress.toString(false) << ": success";
176 } else {
177 LOG(ERROR) << config->getInstanceName() << " enable " << remoteAddress.toString()
178 << ": failed. No valid descriptor created";
179
180 state = core::socket::STATE(core::socket::STATE_FATAL, ECANCELED, "SocketConnector not enabled");
181
183 }
184 } else {
185 SocketConnection* socketConnection =
187
188 LOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": success";
189 LOG(DEBUG) << " " << socketConnection->getLocalAddress().toString() << " -> "
190 << socketConnection->getRemoteAddress().toString();
191
193
194 onConnect(socketConnection);
195 onConnected(socketConnection);
196 }
197 }
198 }
199 }
200 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
201 core::socket::State state =
202 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
203
204 LOG(ERROR) << state.what();
205
206 onStatus({}, state);
207 }
208 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
209 core::socket::State state =
210 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
211
212 LOG(ERROR) << state.what();
213
214 onStatus({}, state);
215 }
216 } else {
217 LOG(DEBUG) << config->getInstanceName() << ": disabled";
218
219 onStatus({}, core::socket::STATE_DISABLED);
220 }
221
222 if (isEnabled()) {
224 } else {
226 }
227 }
228
229 template <typename PhysicalSocketClient,
230 typename Config,
231 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
232 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::connectEvent() {
233 int cErrno = 0;
234
235 if (physicalClientSocket.getSockError(cErrno) == 0) { // == 0->return valid : < 0->getsockopt failed
236 const utils::PreserveErrno pe(cErrno); // errno = cErrno
237
238 if (errno == 0) {
239 SocketConnection* socketConnection = new SocketConnection(std::move(physicalClientSocket), onDisconnect, config);
240
241 LOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": success";
242 LOG(DEBUG) << " " << socketConnection->getLocalAddress().toString() << " -> "
243 << socketConnection->getRemoteAddress().toString();
244
245 onStatus(remoteAddress, core::socket::STATE_OK);
246
247 onConnect(socketConnection);
248 onConnected(socketConnection);
249
250 disable();
251 } else if (PhysicalClientSocket::connectInProgress(errno)) {
252 LOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString() << ": in progress:";
253 } else {
254 SocketAddress currentRemoteAddress = remoteAddress;
255
256 core::socket::State state = core::socket::STATE_OK;
257
258 switch (errno) {
259 case EADDRINUSE:
260 case EADDRNOTAVAIL:
261 case ECONNREFUSED:
262 case ENETUNREACH:
263 case ENOENT:
264 case EHOSTDOWN:
265 state = core::socket::STATE_ERROR;
266 break;
267 default:
268 state = core::socket::STATE_FATAL;
269 break;
270 }
271
272 if (remoteAddress.useNext()) {
273 PLOG(DEBUG) << config->getInstanceName() << " connect '" << remoteAddress.toString();
274
275 onStatus(currentRemoteAddress, (state | core::socket::State::NO_RETRY));
276
277 LOG(DEBUG) << config->getInstanceName()
278 << " using next SocketAddress: " << config->Remote::getSocketAddress().toString();
279
281
282 disable();
283 } else {
284 PLOG(DEBUG) << config->getInstanceName() << " connect " << remoteAddress.toString();
285
286 onStatus(currentRemoteAddress, state);
287
288 disable();
289 }
290 }
291 } else {
292 PLOG(DEBUG) << config->getInstanceName() << " getsockopt syscall error: '" << remoteAddress.toString() << "'";
293
294 onStatus(remoteAddress, core::socket::STATE_FATAL);
295 disable();
296 }
297 }
298
299 template <typename PhysicalSocketClient,
300 typename Config,
301 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
302 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::unobservedEvent() {
304 }
305
306 template <typename PhysicalSocketClient,
307 typename Config,
308 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
309 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::connectTimeout() {
310 LOG(TRACE) << config->getInstanceName() << " connect timeout " << remoteAddress.toString();
311
312 SocketAddress currentRemoteAddress = remoteAddress;
313 if (remoteAddress.useNext()) {
314 LOG(DEBUG) << config->getInstanceName() << " using next SocketAddress: '" << config->Remote::getSocketAddress().toString()
315 << "'";
316
318 } else {
319 LOG(DEBUG) << config->getInstanceName() << " connect timeout '" << remoteAddress.toString() << "'";
320 errno = ETIMEDOUT;
321
322 onStatus(currentRemoteAddress, core::socket::STATE_ERROR);
323 }
324
326 }
327
328 template <typename PhysicalSocketClient,
329 typename Config,
330 template <typename ConfigT, typename PhysicalSocketClientT> typename SocketConnection>
331 void SocketConnector<PhysicalSocketClient, Config, SocketConnection>::destruct() {
332 delete this;
333 }
334
335} // namespace core::socket::stream
void setTimeout(const utils::Timeval &timeout)
virtual bool onSignal(int sig)=0
virtual ~Socket()
Definition Socket.hpp:61
Socket(const std::shared_ptr< Config > &config)
Definition Socket.hpp:56
Config & getConfig() const
Definition Socket.hpp:65
std::shared_ptr< Config > config
Definition Socket.h:75
Socket(const std::string &name)
Definition Socket.hpp:51
std::string what() const
Definition State.cpp:114
static constexpr int NO_RETRY
Definition State.h:59
State operator|(int state)
Definition State.cpp:102
void onReceivedFromPeer(std::size_t available) final
void setReadTimeout(const utils::Timeval &timeout) final
bool streamToPeer(core::pipe::Source *source) final
std::size_t getTotalSent() const override
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnection Super
const SocketAddress & getBindAddress() const final
void shutdownWrite(bool forceClose) final
void doWriteShutdown(const std::function< void()> &onShutdown) override
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
void setWriteTimeout(const utils::Timeval &timeout) final
void setTimeout(const utils::Timeval &timeout) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
std::size_t getTotalRead() const override
typename PhysicalSocket::SocketAddress SocketAddress
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
core::socket::stream::SocketContext * newSocketContext
core::socket::stream::SocketContext * socketContext
const std::string & getConnectionName() const
typename PhysicalClientSocket::SocketAddress SocketAddress
SocketConnector(const SocketConnector &socketConnector)
std::function< void(const SocketAddress &, core::socket::State)> onStatus
SocketConnectionT< PhysicalClientSocket, Config > SocketConnection
std::function< void(SocketConnection *)> onConnected
std::function< void(SocketConnection *)> onDisconnect
PhysicalSocketClientT PhysicalClientSocket
std::function< void(SocketConnection *)> onConnect
SocketConnector(const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
void onReadError(int errnum) override
void onWriteError(int errnum) override
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnectionT< PhysicalSocketT, core::socket::stream::legacy::SocketReader, core::socket::stream::legacy::SocketWriter, ConfigT > Super
typename Super::SocketAddress SocketAddress
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnector< PhysicalClientSocketT, ConfigT, core::socket::stream::legacy::SocketConnection > Super
SocketConnector(const SocketConnector &socketConnector)
typename Super::SocketConnection SocketConnection
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
State
Definition State.h:51
@ RUNNING
Definition State.h:51
State eventLoopState()
Definition State.cpp:52
std::string sigabbrev_np(int sig)
Definition signal.cpp:59
Definition Config.h:59