2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
42#include "core/socket/stream/SocketConnection.h"
43#include "core/socket/stream/SocketContext.h"
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
49#include "utils/system/signal.h"
58namespace core::socket::
stream {
60 template <
typename SocketAddress,
typename PhysicalSocket,
typename Config>
62 typename SocketAddress::SockAddr localSockAddr;
63 typename SocketAddress::SockLen localSockAddrLen =
sizeof(
typename SocketAddress::SockAddr);
65 SocketAddress localPeerAddress;
66 if (physicalSocket.getSockName(localSockAddr, localSockAddrLen) == 0) {
68 localPeerAddress = config->Local::getSocketAddress(localSockAddr, localSockAddrLen);
69 LOG(TRACE) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
70 <<
" PeerAddress (local): " << localPeerAddress.toString();
71 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
72 LOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
73 <<
" PeerAddress (local): " << badSocketAddress.what();
76 PLOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
77 <<
" PeerAddress (local) not retrievable";
80 return localPeerAddress;
83 template <
typename SocketAddress,
typename PhysicalSocket,
typename Config>
85 typename SocketAddress::SockAddr remoteSockAddr;
86 typename SocketAddress::SockLen remoteSockAddrLen =
sizeof(
typename SocketAddress::SockAddr);
88 SocketAddress remotePeerAddress;
89 if (physicalSocket.getPeerName(remoteSockAddr, remoteSockAddrLen) == 0) {
91 remotePeerAddress = config->Remote::getSocketAddress(remoteSockAddr, remoteSockAddrLen);
92 LOG(TRACE) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
93 <<
" PeerAddress (remote): " << remotePeerAddress.toString();
94 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
95 LOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
96 <<
" PeerAddress (remote): " << badSocketAddress.what();
99 PLOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
100 <<
" PeerAddress (remote) not retrievble";
103 return remotePeerAddress;
106 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
108 const std::function<
void()>& onDisconnectm,
109 const std::shared_ptr<Config>& config)
112 instanceName +
" [" + std::to_string(physicalSocket.getFd()) +
"]",
122 SocketReader::disable();
126 config->getReadTimeout(),
127 config->getReadBlockSize(),
128 config->getTerminateTimeout())
130 instanceName +
" [" + std::to_string(physicalSocket.getFd()) +
"]",
136 SocketWriter::disable();
140 config->getWriteTimeout(),
141 config->getWriteBlockSize(),
142 config->getTerminateTimeout())
153 SocketWriter::suspend();
157 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
161 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
167 template <
typename PhysicalSocketT,
typename SocketReaderT,
typename SocketWriterT,
typename ConfigT>
169 SocketReader::setTimeout(timeout);
171 template <
typename PhysicalSocketT,
typename SocketReaderT,
typename SocketWriterT,
typename ConfigT>
173 SocketWriter::setTimeout(timeout);
176 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
181 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
182 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::SocketAddress&
187 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
188 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::SocketAddress&
193 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
194 const typename SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::SocketAddress&
199 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
201 return SocketReader::readFromPeer(chunk, chunkLen);
204 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
206 SocketWriter::sendToPeer(chunk, chunkLen);
209 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
211 return SocketWriter::streamToPeer(source);
214 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
216 SocketWriter::streamEof();
219 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
223 SocketReader::shutdownRead();
232 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
234 if (!SocketWriter::shutdownInProgress) {
237 SocketWriter::shutdownWrite([forceClose,
this]() {
238 if (SocketWriter::isEnabled()) {
239 SocketWriter::disable();
241 if (forceClose && SocketReader::isEnabled()) {
248 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
250 if (SocketWriter::isEnabled()) {
251 SocketWriter::disable();
253 if (SocketReader::isEnabled()) {
254 SocketReader::disable();
258 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
263 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
265 return SocketWriter::getTotalSent();
268 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
270 return SocketWriter::getTotalQueued();
273 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
275 return SocketReader::getTotalRead();
278 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
280 return SocketReader::getTotalProcessed();
283 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
300 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
305 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
310 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
315 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
325 LOG(DEBUG) <<
connectionName <<
": Shutting down due to signal '" << strsignal(signum) <<
"' (SIG"
335 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
341 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
347 template <
typename PhysicalSocket,
typename SocketReader,
typename SocketWriter,
typename Config>
virtual bool onSignal(int sig)=0
Socket(const std::shared_ptr< Config > &config)
Config & getConfig() const
std::shared_ptr< Config > config
Socket(const std::string &name)
SocketAddress remoteAddress
void onReceivedFromPeer(std::size_t available) final
void setReadTimeout(const utils::Timeval &timeout) final
bool streamToPeer(core::pipe::Source *source) final
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnectm, const std::shared_ptr< Config > &config)
std::size_t getTotalSent() const override
std::shared_ptr< Config > config
const SocketAddress & getBindAddress() const final
void shutdownWrite(bool forceClose) final
void doWriteShutdown(const std::function< void()> &onShutdown) override
void writeTimeout() final
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
void setWriteTimeout(const utils::Timeval &timeout) final
std::function< void()> onDisconnect
void setTimeout(const utils::Timeval &timeout) final
PhysicalSocket physicalSocket
void onReadError(int errnum)
bool onSignal(int signum) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
SocketAddress localAddress
Config & getConfig() const
std::size_t getTotalRead() const override
void unobservedEvent() final
void onWriteError(int errnum)
~SocketConnectionT() override
void shutdownRead() final
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
std::string connectionName
core::socket::stream::SocketContext * socketContext
void onReadError(int errnum) override
void onWriteError(int errnum) override
void readFromPeer(std::size_t available)
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
std::string sigabbrev_np(int sig)