SNode.C
Loading...
Searching...
No Matches
SocketConnection.hpp
Go to the documentation of this file.
1/*
2 * SNode.C - a slim toolkit for network communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "core/socket/stream/SocketConnection.h"
21#include "core/socket/stream/SocketContext.h"
22
23#ifndef DOXYGEN_SHOULD_SKIP_THIS
24
25#include "log/Logger.h"
26#include "utils/PreserveErrno.h"
27#include "utils/system/signal.h"
28
29#include <cstddef>
30#include <string>
31#include <utility>
32
33#endif /* DOXYGEN_SHOULD_SKIP_THIS */
34
35namespace core::socket::stream {
36 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
37 SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::SocketConnectionT(const std::string& instanceName,
38 PhysicalSocket&& physicalSocket,
39 const std::function<void()>& onDisconnect,
40 const std::string& configuredServer,
41 const SocketAddress& localAddress,
42 const SocketAddress& remoteAddress,
43 const utils::Timeval& readTimeout,
44 const utils::Timeval& writeTimeout,
45 std::size_t readBlockSize,
46 std::size_t writeBlockSize,
47 const utils::Timeval& terminateTimeout)
48 : SocketConnection(instanceName, physicalSocket.getFd(), configuredServer)
49 , SocketReader(
50 instanceName + " [" + std::to_string(physicalSocket.getFd()) + "]",
51 [this](int errnum) {
52 {
53 const utils::PreserveErrno pe(errnum);
54 if (errno == 0) {
55 LOG(TRACE) << connectionName << " OnReadError: EOF received";
56 } else {
57 PLOG(TRACE) << connectionName << " OnReadError";
58 }
59 }
60 SocketReader::disable();
61
62 onReadError(errnum);
63 },
64 readTimeout,
65 readBlockSize,
66 terminateTimeout)
67 , SocketWriter(
68 instanceName + " [" + std::to_string(physicalSocket.getFd()) + "]",
69 [this](int errnum) {
70 {
71 const utils::PreserveErrno pe(errnum);
72 PLOG(TRACE) << connectionName << " OnWriteError";
73 }
74 SocketWriter::disable();
75
76 onWriteError(errnum);
77 },
78 writeTimeout,
79 writeBlockSize,
80 terminateTimeout)
81 , physicalSocket(std::move(physicalSocket))
82 , onDisconnect(onDisconnect)
83 , localAddress(localAddress)
84 , remoteAddress(remoteAddress) {
85 if (!SocketReader::enable(this->physicalSocket.getFd())) {
86 delete this;
87 } else if (!SocketWriter::enable(this->physicalSocket.getFd())) {
88 delete this;
89 } else {
90 SocketWriter::suspend();
91 }
92 }
93
94 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
95 SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::~SocketConnectionT() {
96 }
97
98 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
99 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::setTimeout(const utils::Timeval& timeout) {
100 SocketReader::setTimeout(timeout);
101 SocketWriter::setTimeout(timeout);
102 }
103
104 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
105 int SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::getFd() const {
106 return physicalSocket.getFd();
107 }
108
109 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
110 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::SocketAddress&
111 SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::getLocalAddress() const {
112 return localAddress;
113 }
114
115 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
116 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::SocketAddress&
117 SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::getRemoteAddress() const {
118 return remoteAddress;
119 }
120
121 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
122 std::size_t SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::readFromPeer(char* chunk, std::size_t chunkLen) {
123 std::size_t ret = 0;
124
125 if (newSocketContext == nullptr) {
126 ret = SocketReader::readFromPeer(chunk, chunkLen);
127 } else {
128 LOG(TRACE) << connectionName << " ReadFromPeer: New SocketContext != nullptr: SocketContextSwitch still in progress";
129 }
130
131 return ret;
132 }
133
134 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
135 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::sendToPeer(const char* chunk, std::size_t chunkLen) {
136 SocketWriter::sendToPeer(chunk, chunkLen);
137 }
138
139 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
140 bool SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::streamToPeer(core::pipe::Source* source) {
141 return SocketWriter::streamToPeer(source);
142 }
143
144 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
145 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::streamEof() {
146 SocketWriter::streamEof();
147 }
148
149 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
150 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::shutdownRead() {
151 LOG(TRACE) << connectionName << ": Shutdown (RD)";
152
153 SocketReader::shutdownRead();
154
155 if (physicalSocket.shutdown(PhysicalSocket::SHUT::RD) == 0) {
156 LOG(DEBUG) << connectionName << " Shutdown (RD): success";
157 } else {
158 PLOG(ERROR) << connectionName << " Shutdown (RD)";
159 }
160 }
161
162 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
163 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::shutdownWrite(bool forceClose) {
164 if (!SocketWriter::shutdownInProgress) {
165 LOG(TRACE) << connectionName << ": Stop writing";
166
167 SocketWriter::shutdownWrite([forceClose, this]() {
168 if (SocketWriter::isEnabled()) {
169 SocketWriter::disable();
170 }
171 if (forceClose && SocketReader::isEnabled()) {
172 close();
173 }
174 });
175 }
176 }
177
178 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
179 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::close() {
180 if (SocketWriter::isEnabled()) {
181 SocketWriter::disable();
182 }
183 if (SocketReader::isEnabled()) {
184 SocketReader::disable();
185 }
186 }
187
188 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
189 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::doWriteShutdown(const std::function<void()>& onShutdown) {
190 errno = 0;
191
192 setTimeout(SocketWriter::terminateTimeout);
193
194 LOG(TRACE) << connectionName << ": Shutdown (WR)";
195
196 if (physicalSocket.shutdown(PhysicalSocket::SHUT::WR) == 0) {
197 LOG(DEBUG) << connectionName << " Shutdown (WR): success";
198 } else {
199 PLOG(ERROR) << connectionName << " Shutdown (WR)";
200 }
201
202 onShutdown();
203 }
204
205 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
206 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::onReceivedFromPeer(std::size_t available) {
207 std::size_t consumed = socketContext->onReceivedFromPeer();
208
209 if (available != 0 && consumed == 0) {
210 LOG(TRACE) << connectionName << ": Data available: " << available << " but nothing read";
211
212 close();
213
214 delete newSocketContext; // delete of nullptr is valid since C++14!
215 newSocketContext = nullptr;
216 } else if (newSocketContext != nullptr) { // Perform a pending SocketContextSwitch
218 setSocketContext(newSocketContext);
219 newSocketContext = nullptr;
220 }
221 }
222
223 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
224 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::onWriteError(int errnum) {
225 socketContext->onWriteError(errnum);
226 }
227
228 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
229 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::onReadError(int errnum) {
230 socketContext->onReadError(errnum);
231 }
232
233 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
234 bool SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::onSignal(int signum) {
235 switch (signum) {
236 case SIGINT:
237 [[fallthrough]];
238 case SIGTERM:
239 [[fallthrough]];
240 case SIGABRT:
241 [[fallthrough]];
242 case SIGHUP:
243 LOG(DEBUG) << connectionName << ": Shutting down due to signal '" << strsignal(signum) << "' (SIG"
244 << utils::system::sigabbrev_np(signum) << " [" << signum << "])";
245 break;
246 case SIGALRM:
247 break;
248 }
249
250 return socketContext != nullptr ? socketContext->onSignal(signum) : true;
251 }
252
253 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
254 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::readTimeout() {
255 LOG(WARNING) << connectionName << ": Read timeout";
256 close();
257 }
258
259 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
260 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::writeTimeout() {
261 LOG(WARNING) << connectionName << ": Write timeout";
262 close();
263 }
264
265 template <typename PhysicalSocket, typename SocketReader, typename SocketWriter>
266 void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::unobservedEvent() {
268
269 onDisconnect();
270
271 delete this;
272 }
273
274} // namespace core::socket::stream
virtual ~Socket()
Definition Socket.hpp:34
Config & getConfig() const
Definition Socket.hpp:38
std::shared_ptr< Config > config
Definition Socket.h:50
Socket(const std::string &name)
Definition Socket.hpp:29
void onReceivedFromPeer(std::size_t available) final
void setTimeout(const utils::Timeval &timeout) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
const SocketAddress & getRemoteAddress() const final
void doWriteShutdown(const std::function< void()> &onShutdown) override
void shutdownWrite(bool forceClose) final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
const SocketAddress & getLocalAddress() const final
SocketConnection(const std::string &instanceName, PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::string &configuredServer, const SocketAddress &localAddress, const SocketAddress &remoteAddress, const utils::Timeval &readTimeout, const utils::Timeval &writeTimeout, std::size_t readBlockSize, std::size_t writeBlockSize, const utils::Timeval &terminateTimeout)
Definition Config.h:37