2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
42#include "core/State.h"
43#include "core/socket/stream/SocketConnector.h"
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
56namespace core::socket::
stream {
58 template <
typename PhysicalSocketClient,
60 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
63 const std::function<
void(SocketConnection*)>& onConnect,
64 const std::function<
void(SocketConnection*)>& onConnected,
65 const std::function<
void(SocketConnection*)>& onDisconnect,
66 const std::function<
void(
const SocketAddress&, core::socket::
State)>& onStatus,
67 const std::shared_ptr<Config>& config)
84 template <
typename PhysicalSocketServer,
86 template <
typename ConfigT,
typename PhysicalSocketServerT>
typename SocketConnection>
104 template <
typename PhysicalSocketClient,
106 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
110 template <
typename PhysicalSocketClient,
112 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
114 if (!
config->getDisabled()) {
116 LOG(TRACE) <<
config->getInstanceName() <<
" Starting";
119 SocketAddress localAddress =
config->Local::getSocketAddress();
122 core::socket::
State state = core::socket::STATE_OK;
130 PLOG(DEBUG) <<
config->getInstanceName() <<
" open: '" << localAddress.toString() <<
"'";
132 state = core::socket::STATE_ERROR;
135 PLOG(DEBUG) <<
config->getInstanceName() <<
" open: '" << localAddress.toString() <<
"'";
137 state = core::socket::STATE_FATAL;
145 PLOG(DEBUG) <<
config->getInstanceName() <<
" bind: '" << localAddress.toString() <<
"'";
147 state = core::socket::STATE_ERROR;
150 PLOG(DEBUG) <<
config->getInstanceName() <<
" bind: '" << localAddress.toString() <<
"'";
152 state = core::socket::STATE_FATAL;
165 PLOG(DEBUG) <<
config->getInstanceName() <<
" connect: '" <<
remoteAddress.toString() <<
"'";
167 state = core::socket::STATE_ERROR;
170 PLOG(DEBUG) <<
config->getInstanceName() <<
" connect: '" <<
remoteAddress.toString() <<
"'";
172 state = core::socket::STATE_FATAL;
180 LOG(DEBUG) <<
config->getInstanceName() <<
" using next SocketAddress: '"
181 <<
config->Remote::getSocketAddress().toString() <<
"'";
187 }
else if (PhysicalClientSocket::connectInProgress(errno)) {
189 LOG(DEBUG) <<
config->getInstanceName() <<
" connect in progress: '" <<
remoteAddress.toString() <<
"'";
191 LOG(DEBUG) <<
config->getInstanceName() <<
" not enabled: '" <<
remoteAddress.toString() <<
"'";
193 state = core::socket::STATE(core::socket::STATE_FATAL, ECANCELED,
"SocketConnector not enabled");
201 onStatus(remoteAddress, core::socket::STATE_OK);
208 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
209 LOG(DEBUG) <<
config->getInstanceName() <<
" " << badSocketAddress.what();
211 onStatus({}, core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what()));
213 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
214 LOG(DEBUG) <<
config->getInstanceName() <<
" " << badSocketAddress.what();
216 onStatus({}, core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what()));
219 LOG(DEBUG) <<
config->getInstanceName() <<
" disabled";
221 onStatus({}, core::socket::STATE_DISABLED);
231 template <
typename PhysicalSocketClient,
233 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
244 onStatus(remoteAddress, core::socket::STATE_OK);
252 }
else if (PhysicalClientSocket::connectInProgress(errno)) {
253 LOG(TRACE) <<
config->getInstanceName() <<
" connect still in progress: '" <<
remoteAddress.toString() <<
"'";
257 core::socket::
State state = core::socket::STATE_OK;
266 PLOG(DEBUG) <<
config->getInstanceName() <<
" connect: '" <<
remoteAddress.toString() <<
"'";
268 state = core::socket::STATE_ERROR;
271 PLOG(DEBUG) <<
config->getInstanceName() <<
": connect: '" <<
remoteAddress.toString() <<
"'";
273 state = core::socket::STATE_FATAL;
279 LOG(DEBUG) <<
config->getInstanceName() <<
" using next SocketAddress: '"
280 <<
config->Remote::getSocketAddress().toString() <<
"'";
286 core::socket::
State state = core::socket::STATE_OK;
295 PLOG(DEBUG) <<
config->getInstanceName() <<
" connect: '" <<
remoteAddress.toString() <<
"'";
297 state = core::socket::STATE_ERROR;
300 PLOG(DEBUG) <<
config->getInstanceName() <<
" connect: '" <<
remoteAddress.toString() <<
"'";
302 state = core::socket::STATE_FATAL;
312 PLOG(DEBUG) <<
config->getInstanceName() <<
" getsockopt syscall error: '" <<
remoteAddress.toString() <<
"'";
314 onStatus(remoteAddress, core::socket::STATE_FATAL);
319 template <
typename PhysicalSocketClient,
321 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
326 template <
typename PhysicalSocketClient,
328 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
330 LOG(TRACE) <<
config->getInstanceName() <<
" connect timeout " <<
remoteAddress.toString();
334 LOG(DEBUG) <<
config->getInstanceName() <<
" using next SocketAddress: '" <<
config->Remote::getSocketAddress().toString()
339 LOG(DEBUG) <<
config->getInstanceName() <<
" connect timeout '" <<
remoteAddress.toString() <<
"'";
342 onStatus(currentRemoteAddress, core::socket::STATE_ERROR);
348 template <
typename PhysicalSocketClient,
350 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
void setTimeout(const utils::Timeval &timeout)
static void atNextTick(const std::function< void(void)> &callBack)
virtual void connectTimeout()
virtual bool onSignal(int sig)=0
Config & getConfig() const
std::shared_ptr< Config > config
Socket(const std::string &name)
static constexpr int NO_RETRY
State operator|(int state)
SocketAddress remoteAddress
void onReceivedFromPeer(std::size_t available) final
bool streamToPeer(core::pipe::Source *source) final
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnectm, const std::shared_ptr< Config > &config)
std::size_t getTotalSent() const override
std::shared_ptr< Config > config
void shutdownWrite(bool forceClose) final
void doWriteShutdown(const std::function< void()> &onShutdown) override
void writeTimeout() final
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
std::function< void()> onDisconnect
void setTimeout(const utils::Timeval &timeout) final
PhysicalSocket physicalSocket
void onReadError(int errnum)
bool onSignal(int signum) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
SocketAddress localAddress
Config & getConfig() const
std::size_t getTotalRead() const override
void unobservedEvent() final
void onWriteError(int errnum)
~SocketConnectionT() override
void shutdownRead() final
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
std::string connectionName
core::socket::stream::SocketContext * socketContext
SocketAddress remoteAddress
virtual void useNextSocketAddress()=0
void connectTimeout() final
SocketConnector(const SocketConnector &socketConnector)
std::function< void(const SocketAddress &, core::socket::State)> onStatus
std::function< void(SocketConnection *)> onConnected
PhysicalClientSocket physicalClientSocket
std::shared_ptr< Config > config
std::shared_ptr< core::socket::stream::SocketContextFactory > socketContextFactory
std::function< void(SocketConnection *)> onDisconnect
std::function< void(SocketConnection *)> onConnect
void unobservedEvent() final
void connectEvent() final
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
~SocketConnector() override
void onReadError(int errnum) override
void onWriteError(int errnum) override
void readFromPeer(std::size_t available)
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
SocketConnector(const SocketConnector &socketConnector)
void useNextSocketAddress() override
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
std::string sigabbrev_np(int sig)