2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
42#include "core/socket/stream/SocketConnection.h"
43#include "core/socket/stream/SocketContext.h"
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
49#include "utils/system/signal.h"
58namespace core::socket::
stream {
60 template <
typename SocketAddress,
typename PhysicalSocket,
typename Config>
62 typename SocketAddress::SockAddr localSockAddr;
63 typename SocketAddress::SockLen localSockAddrLen =
sizeof(
typename SocketAddress::SockAddr);
65 SocketAddress localPeerAddress;
66 if (physicalSocket.getSockName(localSockAddr, localSockAddrLen) == 0) {
68 localPeerAddress = config->Local::getSocketAddress(localSockAddr, localSockAddrLen);
69 LOG(TRACE) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
70 <<
" PeerAddress (local): " << localPeerAddress.toString();
71 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
72 LOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
73 <<
" PeerAddress (local): " << badSocketAddress.what();
76 PLOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
77 <<
" PeerAddress (local) not retrievable";
80 return localPeerAddress;
83 template <
typename SocketAddress,
typename PhysicalSocket,
typename Config>
85 typename SocketAddress::SockAddr remoteSockAddr;
86 typename SocketAddress::SockLen remoteSockAddrLen =
sizeof(
typename SocketAddress::SockAddr);
88 SocketAddress remotePeerAddress;
89 if (physicalSocket.getPeerName(remoteSockAddr, remoteSockAddrLen) == 0) {
91 remotePeerAddress = config->Remote::getSocketAddress(remoteSockAddr, remoteSockAddrLen);
92 LOG(TRACE) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
93 <<
" PeerAddress (remote): " << remotePeerAddress.toString();
94 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
95 LOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
96 <<
" PeerAddress (remote): " << badSocketAddress.what();
99 PLOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
100 <<
" PeerAddress (remote) not retrievble";
103 return remotePeerAddress;
106 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
108 const std::function<
void()>& onDisconnect,
109 const std::shared_ptr<
Config>& config)
126 config->getReadTimeout(),
127 config->getReadBlockSize(),
128 config->getTerminateTimeout())
140 config->getWriteTimeout(),
141 config->getWriteBlockSize(),
142 config->getTerminateTimeout())
157 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
161 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
167 template <
typename PhysicalSocketT,
typename SocketReaderT,
typename SocketWriterT,
typename ConfigT>
171 template <
typename PhysicalSocketT,
typename SocketReaderT,
typename SocketWriterT,
typename ConfigT>
176 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
181 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
187 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
193 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
199 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
206 LOG(TRACE) <<
connectionName <<
" ReadFromPeer: New SocketContext != nullptr: SocketContextSwitch still in progress";
212 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
217 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
222 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
227 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
240 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
256 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
266 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
271 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
276 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
281 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
286 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
291 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
308 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
312 if (available != 0 && consumed == 0) {
313 LOG(TRACE) <<
connectionName <<
": Data available: " << available <<
" but nothing read";
327 LOG(DEBUG) <<
connectionName <<
" SocketConnection: switch completed";
331 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
336 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
341 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
351 LOG(DEBUG) <<
connectionName <<
": Shutting down due to signal '" << strsignal(signum) <<
"' (SIG"
361 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
367 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
373 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
virtual bool onSignal(int sig)=0
Socket(const std::shared_ptr< Config > &config)
Config & getConfig() const
std::shared_ptr< Config > config
Socket(const std::string &name)
SocketAddress remoteAddress
void onReceivedFromPeer(std::size_t available) final
void setReadTimeout(const utils::Timeval &timeout) final
bool streamToPeer(core::pipe::Source *source) final
std::size_t getTotalSent() const override
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnection Super
std::shared_ptr< Config > config
const SocketAddress & getBindAddress() const final
void shutdownWrite(bool forceClose) final
void doWriteShutdown(const std::function< void()> &onShutdown) override
void writeTimeout() final
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
void setWriteTimeout(const utils::Timeval &timeout) final
std::function< void()> onDisconnect
void setTimeout(const utils::Timeval &timeout) final
PhysicalSocket physicalSocket
void onReadError(int errnum)
bool onSignal(int signum) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
SocketAddress localAddress
Config & getConfig() const
std::size_t getTotalRead() const override
void unobservedEvent() final
void onWriteError(int errnum)
~SocketConnectionT() override
void shutdownRead() final
SocketWriterT SocketWriter
PhysicalSocketT PhysicalSocket
typename PhysicalSocket::SocketAddress SocketAddress
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
SocketReaderT SocketReader
std::string connectionName
core::socket::stream::SocketContext * newSocketContext
core::socket::stream::SocketContext * socketContext
const std::string & getConnectionName() const
std::size_t readFromPeer()
void onReadError(int errnum) override
void onWriteError(int errnum) override
PhysicalSocketT PhysicalSocket
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnectionT< PhysicalSocketT, core::socket::stream::legacy::SocketReader, core::socket::stream::legacy::SocketWriter, ConfigT > Super
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
std::string sigabbrev_np(int sig)