SNode.C
Loading...
Searching...
No Matches
SocketConnection.hpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "core/socket/stream/SocketConnection.h"
43#include "core/socket/stream/SocketContext.h"
44
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
46
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
49#include "utils/system/signal.h"
50
51#include <cstddef>
52#include <iomanip>
53#include <string>
54#include <utility>
55
56#endif /* DOXYGEN_SHOULD_SKIP_THIS */
57
58namespace core::socket::stream {
59
60 template <typename SocketAddress, typename PhysicalSocket, typename Config>
61 SocketAddress getLocalSocketAddress(PhysicalSocket& physicalSocket, Config& config) {
62 typename SocketAddress::SockAddr localSockAddr;
63 typename SocketAddress::SockLen localSockAddrLen = sizeof(typename SocketAddress::SockAddr);
64
65 SocketAddress localPeerAddress;
66 if (physicalSocket.getSockName(localSockAddr, localSockAddrLen) == 0) {
67 try {
68 localPeerAddress = config->Local::getSocketAddress(localSockAddr, localSockAddrLen);
69 LOG(TRACE) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
70 << " PeerAddress (local): " << localPeerAddress.toString();
71 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
72 LOG(WARNING) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
73 << " PeerAddress (local): " << badSocketAddress.what();
74 }
75 } else {
76 PLOG(WARNING) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
77 << " PeerAddress (local) not retrievable";
78 }
79
80 return localPeerAddress;
81 }
82
83 template <typename SocketAddress, typename PhysicalSocket, typename Config>
84 SocketAddress getRemoteSocketAddress(PhysicalSocket& physicalSocket, Config& config) {
85 typename SocketAddress::SockAddr remoteSockAddr;
86 typename SocketAddress::SockLen remoteSockAddrLen = sizeof(typename SocketAddress::SockAddr);
87
88 SocketAddress remotePeerAddress;
89 if (physicalSocket.getPeerName(remoteSockAddr, remoteSockAddrLen) == 0) {
90 try {
91 remotePeerAddress = config->Remote::getSocketAddress(remoteSockAddr, remoteSockAddrLen);
92 LOG(TRACE) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
93 << " PeerAddress (remote): " << remotePeerAddress.toString();
94 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
95 LOG(WARNING) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
96 << " PeerAddress (remote): " << badSocketAddress.what();
97 }
98 } else {
99 PLOG(WARNING) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
100 << " PeerAddress (remote) not retrievble";
101 }
102
103 return remotePeerAddress;
104 }
105
106 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
107 SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::SocketConnectionT(PhysicalSocket&& physicalSocket,
108 const std::function<void()>& onDisconnectm,
109 const std::shared_ptr<Config>& config)
110 : SocketConnection(physicalSocket.getFd(), config.get())
111 , SocketReader(
112 instanceName + " [" + std::to_string(physicalSocket.getFd()) + "]",
113 [this](int errnum) {
114 {
115 const utils::PreserveErrno pe(errnum);
116 if (errno == 0) {
117 LOG(TRACE) << connectionName << " OnReadError: EOF received";
118 } else {
119 PLOG(TRACE) << connectionName << " OnReadError";
120 }
121 }
122 SocketReader::disable();
123
124 onReadError(errnum);
125 },
126 config->getReadTimeout(),
127 config->getReadBlockSize(),
128 config->getTerminateTimeout())
129 , SocketWriter(
130 instanceName + " [" + std::to_string(physicalSocket.getFd()) + "]",
131 [this](int errnum) {
132 {
133 const utils::PreserveErrno pe(errnum);
134 PLOG(TRACE) << connectionName << " OnWriteError";
135 }
136 SocketWriter::disable();
137
138 onWriteError(errnum);
139 },
140 config->getWriteTimeout(),
141 config->getWriteBlockSize(),
142 config->getTerminateTimeout())
143 , physicalSocket(std::move(physicalSocket))
144 , onDisconnect(onDisconnectm)
145 , localAddress(getLocalSocketAddress<SocketAddress>(this->physicalSocket, config))
146 , remoteAddress(getRemoteSocketAddress<SocketAddress>(this->physicalSocket, config))
147 , config(config) {
148 if (!SocketReader::enable(this->physicalSocket.getFd())) {
149 delete this;
150 } else if (!SocketWriter::enable(this->physicalSocket.getFd())) {
151 delete this;
152 } else {
153 SocketWriter::suspend();
154 }
155 }
156
157 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
158 SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::~SocketConnectionT() {
159 }
160
161 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
162 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::setTimeout(const utils::Timeval& timeout) {
163 SocketReader::setTimeout(timeout);
164 SocketWriter::setTimeout(timeout);
165 }
166
167 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
168 int SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getFd() const {
169 return physicalSocket.getFd();
170 }
172 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
173 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::SocketAddress&
174 SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getLocalAddress() const {
176 }
177
178 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
179 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::SocketAddress&
180 SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getRemoteAddress() const {
181 return remoteAddress;
184 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
185 std::size_t SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::readFromPeer(char* chunk, std::size_t chunkLen) {
186 return SocketReader::readFromPeer(chunk, chunkLen);
187 }
189 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
190 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::sendToPeer(const char* chunk, std::size_t chunkLen) {
191 SocketWriter::sendToPeer(chunk, chunkLen);
192 }
194 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
195 bool SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::streamToPeer(core::pipe::Source* source) {
196 return SocketWriter::streamToPeer(source);
197 }
199 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
200 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::streamEof() {
201 SocketWriter::streamEof();
202 }
204 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
205 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::shutdownRead() {
206 LOG(TRACE) << connectionName << ": Shutdown (RD)";
208 SocketReader::shutdownRead();
210 if (physicalSocket.shutdown(PhysicalSocket::SHUT::RD) == 0) {
211 LOG(DEBUG) << connectionName << " Shutdown (RD): success";
212 } else {
213 PLOG(ERROR) << connectionName << " Shutdown (RD)";
214 }
216
217 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
218 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::shutdownWrite(bool forceClose) {
219 if (!SocketWriter::shutdownInProgress) {
220 LOG(TRACE) << connectionName << ": Stop writing";
221
222 SocketWriter::shutdownWrite([forceClose, this]() {
223 if (SocketWriter::isEnabled()) {
224 SocketWriter::disable();
225 }
226 if (forceClose && SocketReader::isEnabled()) {
227 close();
228 }
229 });
230 }
231 }
232
233 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
234 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::close() {
235 if (SocketWriter::isEnabled()) {
236 SocketWriter::disable();
237 }
238 if (SocketReader::isEnabled()) {
239 SocketReader::disable();
240 }
241 }
242
243 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
244 Config& SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getConfig() const {
245 return *config;
246 }
247
248 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
249 std::size_t SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getTotalSent() const {
250 return SocketWriter::getTotalSent();
251 }
252
253 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
254 std::size_t SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getTotalQueued() const {
255 return SocketWriter::getTotalQueued();
256 }
257
258 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
259 std::size_t SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getTotalRead() const {
260 return SocketReader::getTotalRead();
261 }
262
263 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
264 std::size_t SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::getTotalProcessed() const {
265 return SocketReader::getTotalProcessed();
266 }
267
268 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
269 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::doWriteShutdown(const std::function<void()>& onShutdown) {
270 errno = 0;
271
272 setTimeout(SocketWriter::terminateTimeout);
273
274 LOG(TRACE) << connectionName << ": Shutdown (WR)";
275
276 if (physicalSocket.shutdown(PhysicalSocket::SHUT::WR) == 0) {
277 LOG(DEBUG) << connectionName << " Shutdown (WR): success";
278 } else {
279 PLOG(ERROR) << connectionName << " Shutdown (WR)";
280 }
281
282 onShutdown();
283 }
284
285 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
286 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::onReceivedFromPeer(std::size_t available) {
288 }
289
290 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
291 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::onWriteError(int errnum) {
293 }
294
295 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
296 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::onReadError(int errnum) {
298 }
299
300 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
301 bool SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::onSignal(int signum) {
302 switch (signum) {
303 case SIGINT:
304 [[fallthrough]];
305 case SIGTERM:
306 [[fallthrough]];
307 case SIGABRT:
308 [[fallthrough]];
309 case SIGHUP:
310 LOG(DEBUG) << connectionName << ": Shutting down due to signal '" << strsignal(signum) << "' (SIG"
311 << utils::system::sigabbrev_np(signum) << " [" << signum << "])";
312 break;
313 case SIGALRM:
314 break;
315 }
316
317 return socketContext != nullptr ? socketContext->onSignal(signum) : true;
318 }
319
320 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
321 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::readTimeout() {
322 LOG(WARNING) << connectionName << ": Read timeout";
323 close();
324 }
325
326 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
327 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::writeTimeout() {
328 LOG(WARNING) << connectionName << ": Write timeout";
329 close();
330 }
331
332 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
333 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::unobservedEvent() {
334 if (socketContext != nullptr) {
336
337 LOG(DEBUG) << connectionName << ": SocketContext detached";
338 }
339
340 onDisconnect();
341
342 delete this;
343 }
344
345} // namespace core::socket::stream
virtual bool onSignal(int sig)=0
virtual ~Socket()
Definition Socket.hpp:56
Config & getConfig() const
Definition Socket.hpp:60
std::shared_ptr< Config > config
Definition Socket.h:72
Socket(const std::string &name)
Definition Socket.hpp:51
void onReceivedFromPeer(std::size_t available) final
bool streamToPeer(core::pipe::Source *source) final
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnectm, const std::shared_ptr< Config > &config)
std::size_t getTotalSent() const override
void shutdownWrite(bool forceClose) final
void doWriteShutdown(const std::function< void()> &onShutdown) override
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
void setTimeout(const utils::Timeval &timeout) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
std::size_t getTotalRead() const override
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
core::socket::stream::SocketContext * socketContext
void onReadError(int errnum) override
void onWriteError(int errnum) override
void readFromPeer(std::size_t available)
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
std::string sigabbrev_np(int sig)
Definition signal.cpp:59