2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
42#include "core/socket/stream/SocketConnection.h"
43#include "core/socket/stream/SocketContext.h"
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
49#include "utils/system/signal.h"
58namespace core::socket::
stream {
60 template <
typename SocketAddress,
typename PhysicalSocket,
typename Config>
62 typename SocketAddress::SockAddr localSockAddr;
63 typename SocketAddress::SockLen localSockAddrLen =
sizeof(
typename SocketAddress::SockAddr);
65 SocketAddress localPeerAddress;
66 if (physicalSocket.getSockName(localSockAddr, localSockAddrLen) == 0) {
68 localPeerAddress = config->Local::getSocketAddress(localSockAddr, localSockAddrLen);
69 LOG(TRACE) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
70 <<
" PeerAddress (local): " << localPeerAddress.toString();
71 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
72 LOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
73 <<
" PeerAddress (local): " << badSocketAddress.what();
76 PLOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
77 <<
" PeerAddress (local) not retrievable";
80 return localPeerAddress;
83 template <
typename SocketAddress,
typename PhysicalSocket,
typename Config>
85 typename SocketAddress::SockAddr remoteSockAddr;
86 typename SocketAddress::SockLen remoteSockAddrLen =
sizeof(
typename SocketAddress::SockAddr);
88 SocketAddress remotePeerAddress;
89 if (physicalSocket.getPeerName(remoteSockAddr, remoteSockAddrLen) == 0) {
91 remotePeerAddress = config->Remote::getSocketAddress(remoteSockAddr, remoteSockAddrLen);
92 LOG(TRACE) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
93 <<
" PeerAddress (remote): " << remotePeerAddress.toString();
94 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
95 LOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
96 <<
" PeerAddress (remote): " << badSocketAddress.what();
99 PLOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
100 <<
" PeerAddress (remote) not retrievble";
103 return remotePeerAddress;
106 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
108 const std::function<
void()>& onDisconnectm,
109 const std::shared_ptr<Config>& config)
112 instanceName +
" [" + std::to_string(physicalSocket.getFd()) +
"]",
122 SocketReader::disable();
126 config->getReadTimeout(),
127 config->getReadBlockSize(),
128 config->getTerminateTimeout())
130 instanceName +
" [" + std::to_string(physicalSocket.getFd()) +
"]",
136 SocketWriter::disable();
140 config->getWriteTimeout(),
141 config->getWriteBlockSize(),
142 config->getTerminateTimeout())
153 SocketWriter::suspend();
157 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
161 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
163 SocketReader::setTimeout(timeout);
164 SocketWriter::setTimeout(timeout);
167 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
172 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
173 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::SocketAddress&
178 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
179 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::SocketAddress&
184 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
186 return SocketReader::readFromPeer(chunk, chunkLen);
189 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
191 SocketWriter::sendToPeer(chunk, chunkLen);
194 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
196 return SocketWriter::streamToPeer(source);
199 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
201 SocketWriter::streamEof();
204 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
208 SocketReader::shutdownRead();
217 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
219 if (!SocketWriter::shutdownInProgress) {
222 SocketWriter::shutdownWrite([forceClose,
this]() {
223 if (SocketWriter::isEnabled()) {
224 SocketWriter::disable();
226 if (forceClose && SocketReader::isEnabled()) {
233 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
235 if (SocketWriter::isEnabled()) {
236 SocketWriter::disable();
238 if (SocketReader::isEnabled()) {
239 SocketReader::disable();
243 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
248 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
250 return SocketWriter::getTotalSent();
253 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
255 return SocketWriter::getTotalQueued();
258 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
260 return SocketReader::getTotalRead();
263 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
265 return SocketReader::getTotalProcessed();
268 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
285 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
290 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
295 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
300 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
310 LOG(DEBUG) <<
connectionName <<
": Shutting down due to signal '" << strsignal(signum) <<
"' (SIG"
320 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
326 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
332 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
virtual bool onSignal(int sig)=0
Config & getConfig() const
std::shared_ptr< Config > config
Socket(const std::string &name)
SocketAddress remoteAddress
void onReceivedFromPeer(std::size_t available) final
bool streamToPeer(core::pipe::Source *source) final
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnectm, const std::shared_ptr< Config > &config)
std::size_t getTotalSent() const override
std::shared_ptr< Config > config
void shutdownWrite(bool forceClose) final
void doWriteShutdown(const std::function< void()> &onShutdown) override
void writeTimeout() final
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
std::function< void()> onDisconnect
void setTimeout(const utils::Timeval &timeout) final
PhysicalSocket physicalSocket
void onReadError(int errnum)
bool onSignal(int signum) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
SocketAddress localAddress
Config & getConfig() const
std::size_t getTotalRead() const override
void unobservedEvent() final
void onWriteError(int errnum)
~SocketConnectionT() override
void shutdownRead() final
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
std::string connectionName
core::socket::stream::SocketContext * socketContext
void onReadError(int errnum) override
void onWriteError(int errnum) override
void readFromPeer(std::size_t available)
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
std::string sigabbrev_np(int sig)