2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
42#include "core/State.h"
43#include "core/socket/stream/SocketConnector.h"
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
56namespace core::socket::
stream {
58 template <
typename SocketAddress,
typename PhysicalSocket,
typename Config>
60 typename SocketAddress::SockAddr localSockAddr;
61 typename SocketAddress::SockLen localSockAddrLen =
sizeof(
typename SocketAddress::SockAddr);
63 SocketAddress localPeerAddress;
64 if (physicalSocket.getSockName(localSockAddr, localSockAddrLen) == 0) {
66 localPeerAddress = config->Local::getSocketAddress(localSockAddr, localSockAddrLen);
67 LOG(TRACE) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
68 <<
" PeerAddress (local): " << localPeerAddress.toString();
69 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
70 LOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
71 <<
" PeerAddress (local): " << badSocketAddress.what();
74 PLOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
75 <<
" PeerAddress (local) not retrievable";
78 return localPeerAddress;
81 template <
typename SocketAddress,
typename PhysicalSocket,
typename Config>
83 typename SocketAddress::SockAddr remoteSockAddr;
84 typename SocketAddress::SockLen remoteSockAddrLen =
sizeof(
typename SocketAddress::SockAddr);
86 SocketAddress remotePeerAddress;
87 if (physicalSocket.getPeerName(remoteSockAddr, remoteSockAddrLen) == 0) {
89 remotePeerAddress = config->Remote::getSocketAddress(remoteSockAddr, remoteSockAddrLen);
90 LOG(TRACE) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
91 <<
" PeerAddress (remote): " << remotePeerAddress.toString();
92 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
93 LOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
94 <<
" PeerAddress (remote): " << badSocketAddress.what();
97 PLOG(WARNING) << config->getInstanceName() <<
" [" << physicalSocket.getFd() <<
"]" << std::setw(25)
98 <<
" PeerAddress (remote) not retrievable";
101 return remotePeerAddress;
104 template <
typename PhysicalSocketClient,
typename Config,
template <
typename PhysicalSocketClientT>
typename SocketConnection>
107 const std::function<
void(SocketConnection*)>& onConnect,
108 const std::function<
void(SocketConnection*)>& onConnected,
109 const std::function<
void(SocketConnection*)>& onDisconnect,
110 const std::function<
void(
const SocketAddress&, core::socket::
State)>& onStatus,
111 const std::shared_ptr<Config>& config)
128 template <
typename PhysicalSocketServer,
typename Config,
template <
typename PhysicalSocketServerT>
typename SocketConnection>
146 template <
typename PhysicalSocketClient,
typename Config,
template <
typename PhysicalSocketClientT>
typename SocketConnection>
150 template <
typename PhysicalSocketClient,
typename Config,
template <
typename PhysicalSocketClientT>
typename SocketConnection>
152 if (!
config->getDisabled()) {
154 LOG(TRACE) <<
config->getInstanceName() <<
" Starting";
157 SocketAddress localAddress =
config->Local::getSocketAddress();
160 core::socket::
State state = core::socket::STATE_OK;
168 PLOG(DEBUG) <<
config->getInstanceName() <<
" open: '" << localAddress.toString() <<
"'";
170 state = core::socket::STATE_ERROR;
173 PLOG(DEBUG) <<
config->getInstanceName() <<
" open: '" << localAddress.toString() <<
"'";
175 state = core::socket::STATE_FATAL;
183 PLOG(DEBUG) <<
config->getInstanceName() <<
" bind: '" << localAddress.toString() <<
"'";
185 state = core::socket::STATE_ERROR;
188 PLOG(DEBUG) <<
config->getInstanceName() <<
" bind: '" << localAddress.toString() <<
"'";
190 state = core::socket::STATE_FATAL;
203 PLOG(DEBUG) <<
config->getInstanceName() <<
" connect: '" <<
remoteAddress.toString() <<
"'";
205 state = core::socket::STATE_ERROR;
208 PLOG(DEBUG) <<
config->getInstanceName() <<
" connect: '" <<
remoteAddress.toString() <<
"'";
210 state = core::socket::STATE_FATAL;
218 LOG(DEBUG) <<
config->getInstanceName() <<
" using next SocketAddress: '"
219 <<
config->Remote::getSocketAddress().toString() <<
"'";
225 }
else if (PhysicalClientSocket::connectInProgress(errno)) {
227 LOG(DEBUG) <<
config->getInstanceName() <<
" connect in progress: '" <<
remoteAddress.toString() <<
"'";
229 LOG(DEBUG) <<
config->getInstanceName() <<
" not enabled: '" <<
remoteAddress.toString() <<
"'";
231 state = core::socket::STATE(core::socket::STATE_FATAL, ECANCELED,
"SocketConnector not enabled");
239 onStatus(remoteAddress, core::socket::STATE_OK);
241 SocketConnection* socketConnection =
242 new SocketConnection(
config->getInstanceName(),
249 config->getWriteTimeout(),
250 config->getReadBlockSize(),
251 config->getWriteBlockSize(),
252 config->getTerminateTimeout());
257 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
258 LOG(DEBUG) <<
config->getInstanceName() <<
" " << badSocketAddress.what();
260 onStatus({}, core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what()));
262 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
263 LOG(DEBUG) <<
config->getInstanceName() <<
" " << badSocketAddress.what();
265 onStatus({}, core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what()));
268 LOG(DEBUG) <<
config->getInstanceName() <<
" disabled";
270 onStatus({}, core::socket::STATE_DISABLED);
280 template <
typename PhysicalSocketClient,
typename Config,
template <
typename PhysicalSocketClientT>
typename SocketConnection>
291 onStatus(remoteAddress, core::socket::STATE_OK);
293 SocketConnection* socketConnection =
294 new SocketConnection(
config->getInstanceName(),
301 config->getWriteTimeout(),
302 config->getReadBlockSize(),
303 config->getWriteBlockSize(),
304 config->getTerminateTimeout());
310 }
else if (PhysicalClientSocket::connectInProgress(errno)) {
311 LOG(TRACE) <<
config->getInstanceName() <<
" connect still in progress: '" <<
remoteAddress.toString() <<
"'";
315 core::socket::
State state = core::socket::STATE_OK;
324 PLOG(DEBUG) <<
config->getInstanceName() <<
" connect: '" <<
remoteAddress.toString() <<
"'";
326 state = core::socket::STATE_ERROR;
329 PLOG(DEBUG) <<
config->getInstanceName() <<
": connect: '" <<
remoteAddress.toString() <<
"'";
331 state = core::socket::STATE_FATAL;
337 LOG(DEBUG) <<
config->getInstanceName() <<
" using next SocketAddress: '"
338 <<
config->Remote::getSocketAddress().toString() <<
"'";
344 core::socket::
State state = core::socket::STATE_OK;
353 PLOG(DEBUG) <<
config->getInstanceName() <<
" connect: '" <<
remoteAddress.toString() <<
"'";
355 state = core::socket::STATE_ERROR;
358 PLOG(DEBUG) <<
config->getInstanceName() <<
" connect: '" <<
remoteAddress.toString() <<
"'";
360 state = core::socket::STATE_FATAL;
370 PLOG(DEBUG) <<
config->getInstanceName() <<
" getsockopt syscall error: '" <<
remoteAddress.toString() <<
"'";
372 onStatus(remoteAddress, core::socket::STATE_FATAL);
377 template <
typename PhysicalSocketClient,
typename Config,
template <
typename PhysicalSocketClientT>
typename SocketConnection>
382 template <
typename PhysicalSocketClient,
typename Config,
template <
typename PhysicalSocketClientT>
typename SocketConnection>
384 LOG(TRACE) <<
config->getInstanceName() <<
" connect timeout " <<
remoteAddress.toString();
388 LOG(DEBUG) <<
config->getInstanceName() <<
" using next SocketAddress: '" <<
config->Remote::getSocketAddress().toString()
393 LOG(DEBUG) <<
config->getInstanceName() <<
" connect timeout '" <<
remoteAddress.toString() <<
"'";
396 onStatus(currentRemoteAddress, core::socket::STATE_ERROR);
402 template <
typename PhysicalSocketClient,
typename Config,
template <
typename PhysicalSocketClientT>
typename SocketConnection>
void setTimeout(const utils::Timeval &timeout)
static void atNextTick(const std::function< void(void)> &callBack)
virtual void connectTimeout()
virtual std::size_t onReceivedFromPeer()=0
virtual bool onSignal(int sig)=0
Config & getConfig() const
std::shared_ptr< Config > config
Socket(const std::string &name)
static constexpr int NO_RETRY
State operator|(int state)
SocketAddress localAddress
SocketAddress remoteAddress
void unobservedEvent() final
void onReadError(int errnum)
void shutdownRead() final
void onReceivedFromPeer(std::size_t available) final
void onWriteError(int errnum)
void setTimeout(const utils::Timeval &timeout) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
bool onSignal(int signum) final
const SocketAddress & getRemoteAddress() const final
PhysicalSocket physicalSocket
SocketConnectionT(const std::string &instanceName, PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnect, const std::string &configuredServer, const SocketAddress &localAddress, const SocketAddress &remoteAddress, const utils::Timeval &readTimeout, const utils::Timeval &writeTimeout, std::size_t readBlockSize, std::size_t writeBlockSize, const utils::Timeval &terminateTimeout)
void doWriteShutdown(const std::function< void()> &onShutdown) override
bool streamToPeer(core::pipe::Source *source) final
void shutdownWrite(bool forceClose) final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
std::function< void()> onDisconnect
const SocketAddress & getLocalAddress() const final
void writeTimeout() final
~SocketConnectionT() override
std::string connectionName
core::socket::stream::SocketContext * newSocketContext
core::socket::stream::SocketContext * socketContext
void disconnectCurrentSocketContext()
void setSocketContext(SocketContext *socketContext)
SocketAddress remoteAddress
virtual void useNextSocketAddress()=0
void connectTimeout() final
SocketConnector(const SocketConnector &socketConnector)
std::function< void(const SocketAddress &, core::socket::State)> onStatus
std::function< void(SocketConnection *)> onConnected
PhysicalClientSocket physicalClientSocket
std::shared_ptr< Config > config
std::shared_ptr< core::socket::stream::SocketContextFactory > socketContextFactory
std::function< void(SocketConnection *)> onDisconnect
std::function< void(SocketConnection *)> onConnect
void unobservedEvent() final
void connectEvent() final
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
~SocketConnector() override
void onReadError(int errnum) override
void onWriteError(int errnum) override
SocketConnection(const std::string &instanceName, PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::string &configuredServer, const SocketAddress &localAddress, const SocketAddress &remoteAddress, const utils::Timeval &readTimeout, const utils::Timeval &writeTimeout, std::size_t readBlockSize, std::size_t writeBlockSize, const utils::Timeval &terminateTimeout)
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
SocketConnector(const SocketConnector &socketConnector)
void useNextSocketAddress() override
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
std::string sigabbrev_np(int sig)