2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
42#include "core/socket/stream/SocketConnection.h"
43#include "core/socket/stream/SocketContext.h"
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
49#include "utils/system/signal.h"
57namespace core::socket::
stream {
58 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
60 PhysicalSocket&& physicalSocket,
61 const std::function<
void()>& onDisconnect,
62 const std::string& configuredServer,
63 const SocketAddress& localAddress,
64 const SocketAddress& remoteAddress,
65 const utils::
Timeval& readTimeout,
66 const utils::
Timeval& writeTimeout,
67 std::size_t readBlockSize,
68 std::size_t writeBlockSize,
69 const utils::
Timeval& terminateTimeout)
72 instanceName +
" [" + std::to_string(physicalSocket.getFd()) +
"]",
82 SocketReader::disable();
90 instanceName +
" [" + std::to_string(physicalSocket.getFd()) +
"]",
96 SocketWriter::disable();
112 SocketWriter::suspend();
116 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
120 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
122 SocketReader::setTimeout(timeout);
123 SocketWriter::setTimeout(timeout);
126 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
131 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
132 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::SocketAddress&
137 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
138 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter>::SocketAddress&
143 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
148 ret = SocketReader::readFromPeer(chunk, chunkLen);
150 LOG(TRACE) <<
connectionName <<
" ReadFromPeer: New SocketContext != nullptr: SocketContextSwitch still in progress";
156 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
158 SocketWriter::sendToPeer(chunk, chunkLen);
161 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
163 return SocketWriter::streamToPeer(source);
166 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
168 SocketWriter::streamEof();
171 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
175 SocketReader::shutdownRead();
184 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
186 if (!SocketWriter::shutdownInProgress) {
189 SocketWriter::shutdownWrite([forceClose,
this]() {
190 if (SocketWriter::isEnabled()) {
191 SocketWriter::disable();
193 if (forceClose && SocketReader::isEnabled()) {
200 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
202 if (SocketWriter::isEnabled()) {
203 SocketWriter::disable();
205 if (SocketReader::isEnabled()) {
206 SocketReader::disable();
210 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
227 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
231 if (available != 0 && consumed == 0) {
232 LOG(TRACE) <<
connectionName <<
": Data available: " << available <<
" but nothing read";
245 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
250 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
255 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
265 LOG(DEBUG) <<
connectionName <<
": Shutting down due to signal '" << strsignal(signum) <<
"' (SIG"
275 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
281 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
287 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter>
virtual std::size_t onReceivedFromPeer()=0
virtual bool onSignal(int sig)=0
Config & getConfig() const
std::shared_ptr< Config > config
Socket(const std::string &name)
SocketAddress localAddress
SocketAddress remoteAddress
void unobservedEvent() final
void onReadError(int errnum)
void shutdownRead() final
void onReceivedFromPeer(std::size_t available) final
void onWriteError(int errnum)
void setTimeout(const utils::Timeval &timeout) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
bool onSignal(int signum) final
const SocketAddress & getRemoteAddress() const final
PhysicalSocket physicalSocket
SocketConnectionT(const std::string &instanceName, PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnect, const std::string &configuredServer, const SocketAddress &localAddress, const SocketAddress &remoteAddress, const utils::Timeval &readTimeout, const utils::Timeval &writeTimeout, std::size_t readBlockSize, std::size_t writeBlockSize, const utils::Timeval &terminateTimeout)
void doWriteShutdown(const std::function< void()> &onShutdown) override
bool streamToPeer(core::pipe::Source *source) final
void shutdownWrite(bool forceClose) final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
std::function< void()> onDisconnect
const SocketAddress & getLocalAddress() const final
void writeTimeout() final
~SocketConnectionT() override
std::string connectionName
core::socket::stream::SocketContext * newSocketContext
core::socket::stream::SocketContext * socketContext
void disconnectCurrentSocketContext()
void setSocketContext(SocketContext *socketContext)
void onReadError(int errnum) override
void onWriteError(int errnum) override
SocketConnection(const std::string &instanceName, PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::string &configuredServer, const SocketAddress &localAddress, const SocketAddress &remoteAddress, const utils::Timeval &readTimeout, const utils::Timeval &writeTimeout, std::size_t readBlockSize, std::size_t writeBlockSize, const utils::Timeval &terminateTimeout)
PreserveErrno(int newErrno=errno)
std::string sigabbrev_np(int sig)