SNode.C
Loading...
Searching...
No Matches
SocketConnection.hpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025, 2026
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "core/socket/stream/SocketConnection.h"
43#include "core/socket/stream/SocketContext.h"
44
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
46
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
49#include "utils/system/signal.h"
50
51#include <iomanip>
52#include <utility>
53
54#endif /* DOXYGEN_SHOULD_SKIP_THIS */
55
56namespace core::socket::stream {
57
58 template <typename SocketAddress, typename PhysicalSocket, typename Config>
59 SocketAddress getLocalSocketAddress(PhysicalSocket& physicalSocket, Config& config) {
60 typename SocketAddress::SockAddr localSockAddr;
61 typename SocketAddress::SockLen localSockAddrLen = sizeof(typename SocketAddress::SockAddr);
62
63 SocketAddress localPeerAddress;
64 if (physicalSocket.getSockName(localSockAddr, localSockAddrLen) == 0) {
65 try {
66 localPeerAddress = config->Local::getSocketAddress(localSockAddr, localSockAddrLen);
67 LOG(TRACE) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
68 << " PeerAddress (local): " << localPeerAddress.toString();
69 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
70 LOG(WARNING) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
71 << " PeerAddress (local): " << badSocketAddress.what();
72 }
73 } else {
74 PLOG(WARNING) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
75 << " PeerAddress (local) not retrievable";
76 }
77
78 return localPeerAddress;
79 }
80
81 template <typename SocketAddress, typename PhysicalSocket, typename Config>
82 SocketAddress getRemoteSocketAddress(PhysicalSocket& physicalSocket, Config& config) {
83 typename SocketAddress::SockAddr remoteSockAddr;
84 typename SocketAddress::SockLen remoteSockAddrLen = sizeof(typename SocketAddress::SockAddr);
85
86 SocketAddress remotePeerAddress;
87 if (physicalSocket.getPeerName(remoteSockAddr, remoteSockAddrLen) == 0) {
88 try {
89 remotePeerAddress = config->Remote::getSocketAddress(remoteSockAddr, remoteSockAddrLen);
90 LOG(TRACE) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
91 << " PeerAddress (remote): " << remotePeerAddress.toString();
92 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
93 LOG(WARNING) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
94 << " PeerAddress (remote): " << badSocketAddress.what();
95 }
96 } else {
97 PLOG(WARNING) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
98 << " PeerAddress (remote) not retrievble";
99 }
100
101 return remotePeerAddress;
102 }
103
104 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
105 SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::SocketConnectionT(PhysicalSocket&& physicalSocket,
106 const std::function<void()>& onDisconnect,
107 const std::shared_ptr<Config>& config)
108 : SocketConnection(physicalSocket.getFd(), config.get())
109 , SocketReader(
111 [this](int errnum) {
112 {
113 const utils::PreserveErrno pe(errnum);
114 if (errno == 0) {
115 LOG(TRACE) << connectionName << " OnReadError: EOF received";
116 } else {
117 PLOG(TRACE) << connectionName << " OnReadError";
118 }
119 }
120 SocketReader::disable();
121
122 onReadError(errnum);
123 },
124 config->getReadTimeout(),
125 config->getReadBlockSize(),
126 config->getTerminateTimeout())
127 , SocketWriter(
129 [this](int errnum) {
130 {
131 const utils::PreserveErrno pe(errnum);
132 PLOG(TRACE) << connectionName << " OnWriteError";
133 }
134 SocketWriter::disable();
135
136 onWriteError(errnum);
137 },
138 config->getWriteTimeout(),
139 config->getWriteBlockSize(),
140 config->getTerminateTimeout())
141 , physicalSocket(std::move(physicalSocket))
142 , onDisconnect(onDisconnect)
143 , localAddress(getLocalSocketAddress<SocketAddress>(this->physicalSocket, config))
144 , remoteAddress(getRemoteSocketAddress<SocketAddress>(this->physicalSocket, config))
145 , config(config) {
146 if (!SocketReader::enable(this->physicalSocket.getFd())) {
147 delete this;
148 } else if (!SocketWriter::enable(this->physicalSocket.getFd())) {
149 delete this;
150 } else {
151 SocketWriter::suspend();
152 }
153 }
154
155 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
156 SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::~SocketConnectionT() {
157 }
158
159 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
160 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::setTimeout(const utils::Timeval& timeout) {
161 setReadTimeout(timeout);
162 setWriteTimeout(timeout);
163 }
164
165 template <typename PhysicalSocketT, typename SocketReaderT, typename SocketWriterT, typename ConfigT>
166 void SocketConnectionT<PhysicalSocketT, SocketReaderT, SocketWriterT, ConfigT>::setReadTimeout(const utils::Timeval& timeout) {
167 SocketReader::setTimeout(timeout);
168 }
169 template <typename PhysicalSocketT, typename SocketReaderT, typename SocketWriterT, typename ConfigT>
170 void SocketConnectionT<PhysicalSocketT, SocketReaderT, SocketWriterT, ConfigT>::setWriteTimeout(const utils::Timeval& timeout) {
171 SocketWriter::setTimeout(timeout);
172 }
173
174 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
175 int SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getFd() const {
176 return physicalSocket.getFd();
177 }
178
179 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
180 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::SocketAddress&
181 SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getBindAddress() const {
182 return physicalSocket.getBindAddress();
183 }
185 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
186 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::SocketAddress&
187 SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getLocalAddress() const {
191 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
192 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::SocketAddress&
193 SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getRemoteAddress() const {
194 return remoteAddress;
195 }
196
197 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
198 std::size_t SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::readFromPeer(char* chunk, std::size_t chunkLen) {
199 std::size_t ret = 0;
201 if (newSocketContext == nullptr) {
202 ret = SocketReader::readFromPeer(chunk, chunkLen);
203 } else {
204 LOG(TRACE) << connectionName << " ReadFromPeer: New SocketContext != nullptr: SocketContextSwitch still in progress";
206
207 return ret;
209
210 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
211 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::sendToPeer(const char* chunk, std::size_t chunkLen) {
212 SocketWriter::sendToPeer(chunk, chunkLen);
213 }
215 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
216 bool SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::streamToPeer(core::pipe::Source* source) {
217 return SocketWriter::streamToPeer(source);
218 }
219
220 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
221 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::streamEof() {
222 SocketWriter::streamEof();
223 }
225 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
226 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::shutdownRead() {
227 LOG(TRACE) << connectionName << ": Shutdown (RD)";
228
229 SocketReader::shutdownRead();
230
231 if (physicalSocket.shutdown(PhysicalSocket::SHUT::RD) == 0) {
232 LOG(DEBUG) << connectionName << " Shutdown (RD): success";
233 } else {
234 PLOG(ERROR) << connectionName << " Shutdown (RD)";
235 }
236 }
237
238 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
239 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::shutdownWrite() {
240 if (!SocketWriter::shutdownInProgress) {
241 LOG(TRACE) << connectionName << ": Stop writing";
242
243 SocketWriter::shutdownWrite([this]() {
244 if (SocketWriter::isEnabled()) {
245 SocketWriter::disable();
246 }
247 });
248 }
249 }
250
251 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
252 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::close() {
253 if (SocketWriter::isEnabled()) {
254 SocketWriter::disable();
255 }
256 if (SocketReader::isEnabled()) {
257 SocketReader::disable();
258 }
259 }
260
261 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
262 Config* SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getConfig() const {
263 return config.get();
264 }
265
266 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
267 std::size_t SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getTotalSent() const {
268 return SocketWriter::getTotalSent();
269 }
270
271 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
272 std::size_t SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getTotalQueued() const {
273 return SocketWriter::getTotalQueued();
274 }
275
276 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
277 std::size_t SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getTotalRead() const {
278 return SocketReader::getTotalRead();
279 }
280
281 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
282 std::size_t SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getTotalProcessed() const {
283 return SocketReader::getTotalProcessed();
284 }
285
286 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
287 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::doWriteShutdown(const std::function<void()>& onShutdown) {
288 errno = 0;
289
290 setTimeout(SocketWriter::terminateTimeout);
291
292 LOG(TRACE) << connectionName << ": Shutdown (WR)";
293
294 if (physicalSocket.shutdown(PhysicalSocket::SHUT::WR) == 0) {
295 LOG(DEBUG) << connectionName << " Shutdown (WR): success";
296 } else {
297 PLOG(ERROR) << connectionName << " Shutdown (WR)";
298 }
299
300 onShutdown();
301 }
302
303 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
304 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::onReceivedFromPeer(std::size_t available) {
305 std::size_t consumed = socketContext->readFromPeer();
306
307 if (available != 0 && consumed == 0) {
308 LOG(TRACE) << connectionName << ": Data available: " << available << " but nothing read";
309
310 close();
311
312 delete newSocketContext;
313 newSocketContext = nullptr;
314 } else if (newSocketContext != nullptr) { // Perform a pending SocketContextSwitch
316
318 newSocketContext = nullptr;
319
321
322 LOG(DEBUG) << connectionName << " SocketConnection: switch completed";
323 }
324 }
325
326 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
327 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::onWriteError(int errnum) {
329 }
330
331 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
332 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::onReadError(int errnum) {
334 }
335
336 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
337 bool SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::onSignal(int signum) {
338 switch (signum) {
339 case SIGINT:
340 [[fallthrough]];
341 case SIGTERM:
342 [[fallthrough]];
343 case SIGABRT:
344 [[fallthrough]];
345 case SIGHUP:
346 LOG(DEBUG) << connectionName << ": Shutting down due to signal '" << utils::system::strsignal(signum) << "' (SIG"
347 << utils::system::sigabbrev_np(signum) << " [" << signum << "])";
348 break;
349 case SIGALRM:
350 break;
351 }
352
353 return socketContext != nullptr ? socketContext->onSignal(signum) : true;
354 }
355
356 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
357 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::readTimeout() {
358 LOG(WARNING) << connectionName << ": Read timeout";
359 close();
360 }
361
362 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
363 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::writeTimeout() {
364 LOG(WARNING) << connectionName << ": Write timeout";
365 close();
366 }
367
368 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
369 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::unobservedEvent() {
370 if (socketContext != nullptr) {
372 }
373
374 onDisconnect();
375
376 LOG(DEBUG) << connectionName << ": disconnected";
377
378 delete this;
379 }
380
381} // namespace core::socket::stream
#define LOG(level)
Definition Logger.h:148
#define PLOG(level)
Definition Logger.h:152
virtual bool onSignal(int sig)=0
virtual ~Socket()
Definition Socket.hpp:61
const std::shared_ptr< Config > config
Definition Socket.h:75
Socket(const std::shared_ptr< Config > &config)
Definition Socket.hpp:56
Socket(const std::string &name)
Definition Socket.hpp:51
Config * getConfig() const
Definition Socket.hpp:65
void onReceivedFromPeer(std::size_t available) final
void setReadTimeout(const utils::Timeval &timeout) final
bool streamToPeer(core::pipe::Source *source) final
std::size_t getTotalSent() const override
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnection Super
const SocketAddress & getBindAddress() const final
void doWriteShutdown(const std::function< void()> &onShutdown) override
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
void setWriteTimeout(const utils::Timeval &timeout) final
void setTimeout(const utils::Timeval &timeout) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
std::size_t getTotalRead() const override
typename PhysicalSocket::SocketAddress SocketAddress
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
core::socket::stream::SocketContext * newSocketContext
core::socket::stream::SocketContext * socketContext
const std::string & getConnectionName() const
void onReadError(int errnum) override
void onWriteError(int errnum) override
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnectionT< PhysicalSocketT, core::socket::stream::legacy::SocketReader, core::socket::stream::legacy::SocketWriter, ConfigT > Super
LogMessage(Level level, int verboseLevel=-1, bool withErrno=false)
Definition Logger.cpp:280
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
std::string strsignal(int sig)
Definition signal.cpp:71
std::string sigabbrev_np(int sig)
Definition signal.cpp:60