2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
42#include "core/State.h"
43#include "core/socket/stream/SocketConnector.h"
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
56namespace core::socket::
stream {
58 template <
typename PhysicalSocketClient,
60 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
63 const std::function<
void(SocketConnection*)>& onConnect,
64 const std::function<
void(SocketConnection*)>& onConnected,
65 const std::function<
void(SocketConnection*)>& onDisconnect,
66 const std::function<
void(
const SocketAddress&, core::socket::
State)>& onStatus,
67 const std::shared_ptr<Config>& config)
84 template <
typename PhysicalSocketServer,
86 template <
typename ConfigT,
typename PhysicalSocketServerT>
typename SocketConnection>
104 template <
typename PhysicalSocketClient,
106 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
110 template <
typename PhysicalSocketClient,
112 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
114 if (!
config->getDisabled()) {
116 core::socket::
State state = core::socket::STATE_OK;
118 LOG(TRACE) <<
config->getInstanceName() <<
" Starting";
120 SocketAddress bindAddress =
config->Local::getSocketAddress();
126 PLOG(DEBUG) <<
config->getInstanceName() <<
" open " << bindAddress.toString();
133 state = core::socket::STATE_ERROR;
136 state = core::socket::STATE_FATAL;
142 LOG(TRACE) <<
config->getInstanceName() <<
" open " << bindAddress.toString() <<
": success";
145 PLOG(DEBUG) <<
config->getInstanceName() <<
" bind " << bindAddress.toString();
149 state = core::socket::STATE_ERROR;
152 state = core::socket::STATE_FATAL;
158 LOG(TRACE) <<
config->getInstanceName() <<
" bind " << bindAddress.toString() <<
": success";
169 state = core::socket::STATE_ERROR;
172 state = core::socket::STATE_FATAL;
180 LOG(DEBUG) <<
config->getInstanceName() <<
" using next SocketAddress: " <<
remoteAddress.toString();
187 LOG(TRACE) <<
config->getInstanceName() <<
" connect " <<
remoteAddress.toString() <<
": success";
189 if (PhysicalClientSocket::connectInProgress(errno)) {
192 <<
config->getInstanceName() <<
" enable " <<
remoteAddress.toString(
false) <<
": success";
195 <<
": failed. No valid descriptor created";
197 state = core::socket::STATE(core::socket::STATE_FATAL, ECANCELED,
"SocketConnector not enabled");
202 SocketConnection* socketConnection =
205 LOG(DEBUG) <<
config->getInstanceName() <<
" connect " <<
remoteAddress.toString() <<
": success";
206 LOG(DEBUG) <<
" " << socketConnection->getLocalAddress().toString() <<
" -> "
207 << socketConnection->getRemoteAddress().toString();
217 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
218 core::socket::
State state =
219 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
225 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
226 core::socket::
State state =
227 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
234 LOG(DEBUG) <<
config->getInstanceName() <<
": disabled";
236 onStatus({}, core::socket::STATE_DISABLED);
246 template <
typename PhysicalSocketClient,
248 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
258 LOG(DEBUG) <<
config->getInstanceName() <<
" connect " <<
remoteAddress.toString() <<
": success";
259 LOG(DEBUG) <<
" " << socketConnection->getLocalAddress().toString() <<
" -> "
260 << socketConnection->getRemoteAddress().toString();
262 onStatus(remoteAddress, core::socket::STATE_OK);
268 }
else if (PhysicalClientSocket::connectInProgress(errno)) {
269 LOG(DEBUG) <<
config->getInstanceName() <<
" connect " <<
remoteAddress.toString() <<
": in progress:";
273 core::socket::
State state = core::socket::STATE_OK;
282 state = core::socket::STATE_ERROR;
285 state = core::socket::STATE_FATAL;
294 LOG(DEBUG) <<
config->getInstanceName()
295 <<
" using next SocketAddress: " <<
config->Remote::getSocketAddress().toString();
309 PLOG(DEBUG) <<
config->getInstanceName() <<
" getsockopt syscall error: '" <<
remoteAddress.toString() <<
"'";
311 onStatus(remoteAddress, core::socket::STATE_FATAL);
316 template <
typename PhysicalSocketClient,
318 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
323 template <
typename PhysicalSocketClient,
325 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
327 LOG(TRACE) <<
config->getInstanceName() <<
" connect timeout " <<
remoteAddress.toString();
331 LOG(DEBUG) <<
config->getInstanceName() <<
" using next SocketAddress: '" <<
config->Remote::getSocketAddress().toString()
336 LOG(DEBUG) <<
config->getInstanceName() <<
" connect timeout '" <<
remoteAddress.toString() <<
"'";
339 onStatus(currentRemoteAddress, core::socket::STATE_ERROR);
345 template <
typename PhysicalSocketClient,
347 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
void setTimeout(const utils::Timeval &timeout)
static void atNextTick(const std::function< void(void)> &callBack)
virtual void connectTimeout()
virtual bool onSignal(int sig)=0
Socket(const std::shared_ptr< Config > &config)
Config & getConfig() const
std::shared_ptr< Config > config
Socket(const std::string &name)
static constexpr int NO_RETRY
State operator|(int state)
SocketAddress remoteAddress
void onReceivedFromPeer(std::size_t available) final
void setReadTimeout(const utils::Timeval &timeout) final
bool streamToPeer(core::pipe::Source *source) final
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnectm, const std::shared_ptr< Config > &config)
std::size_t getTotalSent() const override
std::shared_ptr< Config > config
const SocketAddress & getBindAddress() const final
void shutdownWrite(bool forceClose) final
void doWriteShutdown(const std::function< void()> &onShutdown) override
void writeTimeout() final
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
void setWriteTimeout(const utils::Timeval &timeout) final
std::function< void()> onDisconnect
void setTimeout(const utils::Timeval &timeout) final
PhysicalSocket physicalSocket
void onReadError(int errnum)
bool onSignal(int signum) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
SocketAddress localAddress
Config & getConfig() const
std::size_t getTotalRead() const override
void unobservedEvent() final
void onWriteError(int errnum)
~SocketConnectionT() override
void shutdownRead() final
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
std::string connectionName
core::socket::stream::SocketContext * socketContext
SocketAddress remoteAddress
virtual void useNextSocketAddress()=0
void connectTimeout() final
SocketConnector(const SocketConnector &socketConnector)
std::function< void(const SocketAddress &, core::socket::State)> onStatus
std::function< void(SocketConnection *)> onConnected
PhysicalClientSocket physicalClientSocket
std::shared_ptr< Config > config
std::shared_ptr< core::socket::stream::SocketContextFactory > socketContextFactory
std::function< void(SocketConnection *)> onDisconnect
std::function< void(SocketConnection *)> onConnect
void unobservedEvent() final
void connectEvent() final
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
~SocketConnector() override
void onReadError(int errnum) override
void onWriteError(int errnum) override
void readFromPeer(std::size_t available)
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
SocketConnector(const SocketConnector &socketConnector)
void useNextSocketAddress() override
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
std::string sigabbrev_np(int sig)