2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
42#include "core/State.h"
43#include "core/socket/stream/SocketConnector.h"
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
56namespace core::socket::
stream {
58 template <
typename PhysicalSocketClient,
60 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
66 const std::shared_ptr<
Config>& config)
82 template <
typename PhysicalSocketServer,
84 template <
typename ConfigT,
typename PhysicalSocketServerT>
typename SocketConnection>
101 template <
typename PhysicalSocketClient,
103 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
107 template <
typename PhysicalSocketClient,
109 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
111 if (!
config->getDisabled()) {
113 core::socket::
State state = core::socket::STATE_OK;
115 LOG(TRACE) <<
config->getInstanceName() <<
" Starting";
123 PLOG(DEBUG) <<
config->getInstanceName() <<
" open " << bindAddress.toString();
130 state = core::socket::STATE_ERROR;
133 state = core::socket::STATE_FATAL;
139 LOG(TRACE) <<
config->getInstanceName() <<
" open " << bindAddress.toString() <<
": success";
142 PLOG(DEBUG) <<
config->getInstanceName() <<
" bind " << bindAddress.toString();
146 state = core::socket::STATE_ERROR;
149 state = core::socket::STATE_FATAL;
155 LOG(TRACE) <<
config->getInstanceName() <<
" bind " << bindAddress.toString() <<
": success";
166 state = core::socket::STATE_ERROR;
169 state = core::socket::STATE_FATAL;
177 LOG(INFO) <<
config->getInstanceName() <<
": Using next SocketAddress: " <<
remoteAddress.toString();
184 LOG(TRACE) <<
config->getInstanceName() <<
" connect " <<
remoteAddress.toString() <<
": success";
189 <<
config->getInstanceName() <<
" enable " <<
remoteAddress.toString(
false) <<
": success";
192 <<
": failed. No valid descriptor created";
194 state = core::socket::STATE(core::socket::STATE_FATAL, ECANCELED,
"SocketConnector not enabled");
202 LOG(DEBUG) <<
config->getInstanceName() <<
" connect " <<
remoteAddress.toString() <<
": success";
203 LOG(DEBUG) <<
" " << socketConnection->getLocalAddress().toString() <<
" -> "
204 << socketConnection->getRemoteAddress().toString();
214 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
215 core::socket::
State state =
216 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
222 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
223 core::socket::
State state =
224 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
231 LOG(DEBUG) <<
config->getInstanceName() <<
": disabled";
233 onStatus({}, core::socket::STATE_DISABLED);
243 template <
typename PhysicalSocketClient,
245 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
255 LOG(DEBUG) <<
config->getInstanceName() <<
" connect " <<
remoteAddress.toString() <<
": success";
256 LOG(DEBUG) <<
" " << socketConnection->getLocalAddress().toString() <<
" -> "
257 << socketConnection->getRemoteAddress().toString();
259 onStatus(remoteAddress, core::socket::STATE_OK);
266 LOG(DEBUG) <<
config->getInstanceName() <<
" connect " <<
remoteAddress.toString() <<
": in progress:";
270 core::socket::
State state = core::socket::STATE_OK;
279 state = core::socket::STATE_ERROR;
282 state = core::socket::STATE_FATAL;
291 LOG(DEBUG) <<
config->getInstanceName()
292 <<
" using next SocketAddress: " <<
config->Remote::getSocketAddress().toString();
306 PLOG(DEBUG) <<
config->getInstanceName() <<
" getsockopt syscall error: '" <<
remoteAddress.toString() <<
"'";
308 onStatus(remoteAddress, core::socket::STATE_FATAL);
313 template <
typename PhysicalSocketClient,
315 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
320 template <
typename PhysicalSocketClient,
322 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
324 LOG(TRACE) <<
config->getInstanceName() <<
" connect timeout " <<
remoteAddress.toString();
328 LOG(DEBUG) <<
config->getInstanceName() <<
" using next SocketAddress: '" <<
config->Remote::getSocketAddress().toString()
333 LOG(DEBUG) <<
config->getInstanceName() <<
" connect timeout '" <<
remoteAddress.toString() <<
"'";
336 onStatus(currentRemoteAddress, core::socket::STATE_ERROR);
342 template <
typename PhysicalSocketClient,
344 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
void setTimeout(const utils::Timeval &timeout)
static void atNextTick(const std::function< void(void)> &callBack)
virtual void connectTimeout()
virtual bool onSignal(int sig)=0
Socket(const std::shared_ptr< Config > &config)
Config & getConfig() const
std::shared_ptr< Config > config
Socket(const std::string &name)
static constexpr int NO_RETRY
State operator|(int state)
SocketAddress remoteAddress
void onReceivedFromPeer(std::size_t available) final
void setReadTimeout(const utils::Timeval &timeout) final
bool streamToPeer(core::pipe::Source *source) final
std::size_t getTotalSent() const override
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnection Super
std::shared_ptr< Config > config
const SocketAddress & getBindAddress() const final
void shutdownWrite(bool forceClose) final
void doWriteShutdown(const std::function< void()> &onShutdown) override
void writeTimeout() final
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
void setWriteTimeout(const utils::Timeval &timeout) final
std::function< void()> onDisconnect
void setTimeout(const utils::Timeval &timeout) final
PhysicalSocket physicalSocket
void onReadError(int errnum)
bool onSignal(int signum) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
SocketAddress localAddress
Config & getConfig() const
std::size_t getTotalRead() const override
void unobservedEvent() final
void onWriteError(int errnum)
~SocketConnectionT() override
void shutdownRead() final
SocketWriterT SocketWriter
PhysicalSocketT PhysicalSocket
typename PhysicalSocket::SocketAddress SocketAddress
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
SocketReaderT SocketReader
std::string connectionName
core::socket::stream::SocketContext * newSocketContext
core::socket::stream::SocketContext * socketContext
const std::string & getConnectionName() const
typename PhysicalClientSocket::SocketAddress SocketAddress
SocketAddress remoteAddress
virtual void useNextSocketAddress()=0
void connectTimeout() final
SocketConnector(const SocketConnector &socketConnector)
std::function< void(const SocketAddress &, core::socket::State)> onStatus
SocketConnectionT< PhysicalClientSocket, Config > SocketConnection
std::function< void(SocketConnection *)> onConnected
PhysicalClientSocket physicalClientSocket
std::shared_ptr< Config > config
std::function< void(SocketConnection *)> onDisconnect
PhysicalSocketClientT PhysicalClientSocket
std::function< void(SocketConnection *)> onConnect
void unobservedEvent() final
void connectEvent() final
SocketConnector(const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
~SocketConnector() override
std::size_t readFromPeer()
void onReadError(int errnum) override
void onWriteError(int errnum) override
PhysicalSocketT PhysicalSocket
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnectionT< PhysicalSocketT, core::socket::stream::legacy::SocketReader, core::socket::stream::legacy::SocketWriter, ConfigT > Super
typename Super::SocketAddress SocketAddress
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
typename Super::Config Config
core::socket::stream::SocketConnector< PhysicalClientSocketT, ConfigT, core::socket::stream::legacy::SocketConnection > Super
SocketConnector(const SocketConnector &socketConnector)
void useNextSocketAddress() override
typename Super::SocketConnection SocketConnection
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
std::string sigabbrev_np(int sig)