2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
42#include "core/State.h"
43#include "core/socket/stream/SocketConnector.h"
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
56namespace core::socket::
stream {
58 template <
typename PhysicalSocketClient,
60 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
67 const std::shared_ptr<
Config>& config)
77 template <
typename PhysicalSocketServer,
79 template <
typename ConfigT,
typename PhysicalSocketServerT>
typename SocketConnection>
90 template <
typename PhysicalSocketClient,
92 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
96 template <
typename PhysicalSocketClient,
98 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
100 if (!
config->getDisabled()) {
102 core::socket::
State state = core::socket::STATE_OK;
104 LOG(DEBUG) <<
config->getInstanceName() <<
" Connect: starting";
112 PLOG(DEBUG) <<
config->getInstanceName() <<
" open " << bindAddress.toString();
119 state = core::socket::STATE_ERROR;
122 state = core::socket::STATE_FATAL;
128 LOG(TRACE) <<
config->getInstanceName() <<
" open " << bindAddress.toString() <<
": success";
131 PLOG(DEBUG) <<
config->getInstanceName() <<
" bind " << bindAddress.toString();
135 state = core::socket::STATE_ERROR;
138 state = core::socket::STATE_FATAL;
144 LOG(TRACE) <<
config->getInstanceName() <<
" bind " << bindAddress.toString() <<
": success";
155 state = core::socket::STATE_ERROR;
158 state = core::socket::STATE_FATAL;
166 LOG(INFO) <<
config->getInstanceName() <<
": Using next SocketAddress: " <<
remoteAddress.toString();
173 LOG(TRACE) <<
config->getInstanceName() <<
" connect " <<
remoteAddress.toString() <<
": success";
178 <<
config->getInstanceName() <<
" enable " <<
remoteAddress.toString(
false) <<
": success";
181 <<
": failed. No valid descriptor created";
183 state = core::socket::STATE(core::socket::STATE_FATAL, ECANCELED,
"SocketConnector not enabled");
191 LOG(DEBUG) <<
config->getInstanceName() <<
" connect " <<
remoteAddress.toString() <<
": success";
192 LOG(DEBUG) <<
" " << socketConnection->getLocalAddress().toString() <<
" -> "
193 << socketConnection->getRemoteAddress().toString();
203 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
204 core::socket::
State state =
205 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
211 }
catch (
const typename SocketAddress::BadSocketAddress& badSocketAddress) {
212 core::socket::
State state =
213 core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what());
220 LOG(DEBUG) <<
config->getInstanceName() <<
": disabled";
222 onStatus({}, core::socket::STATE_DISABLED);
233 template <
typename PhysicalSocketClient,
235 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
245 LOG(DEBUG) <<
config->getInstanceName() <<
" connect " <<
remoteAddress.toString() <<
": success";
246 LOG(DEBUG) <<
" " << socketConnection->getLocalAddress().toString() <<
" -> "
247 << socketConnection->getRemoteAddress().toString();
256 LOG(DEBUG) <<
config->getInstanceName() <<
" connect " <<
remoteAddress.toString() <<
": in progress:";
260 core::socket::
State state = core::socket::STATE_OK;
269 state = core::socket::STATE_ERROR;
272 state = core::socket::STATE_FATAL;
281 LOG(DEBUG) <<
config->getInstanceName()
282 <<
" using next SocketAddress: " <<
config->Remote::getSocketAddress().toString();
296 PLOG(DEBUG) <<
config->getInstanceName() <<
" getsockopt syscall error: '" <<
remoteAddress.toString() <<
"'";
303 template <
typename PhysicalSocketClient,
305 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
310 template <
typename PhysicalSocketClient,
312 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
314 LOG(TRACE) <<
config->getInstanceName() <<
" connect timeout " <<
remoteAddress.toString();
318 LOG(DEBUG) <<
config->getInstanceName() <<
" using next SocketAddress: '" <<
config->Remote::getSocketAddress().toString()
323 LOG(DEBUG) <<
config->getInstanceName() <<
" connect timeout '" <<
remoteAddress.toString() <<
"'";
326 onStatus(currentRemoteAddress, core::socket::STATE_ERROR);
332 template <
typename PhysicalSocketClient,
334 template <
typename ConfigT,
typename PhysicalSocketClientT>
typename SocketConnection>
336 if (!
config->getDisabled()) {
void setTimeout(const utils::Timeval &timeout)
virtual void connectTimeout()
virtual bool onSignal(int sig)=0
const std::shared_ptr< Config > config
Socket(const std::shared_ptr< Config > &config)
Config & getConfig() const
Socket(const std::string &name)
static constexpr int NO_RETRY
State operator|(int state)
SocketAddress remoteAddress
void onReceivedFromPeer(std::size_t available) final
void setReadTimeout(const utils::Timeval &timeout) final
bool streamToPeer(core::pipe::Source *source) final
std::size_t getTotalSent() const override
SocketConnectionT(PhysicalSocket &&physicalSocket, const std::function< void()> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnection Super
std::shared_ptr< Config > config
const SocketAddress & getBindAddress() const final
void doWriteShutdown(const std::function< void()> &onShutdown) override
void shutdownWrite() final
void writeTimeout() final
const SocketAddress & getRemoteAddress() const final
void sendToPeer(const char *chunk, std::size_t chunkLen) final
void setWriteTimeout(const utils::Timeval &timeout) final
std::function< void()> onDisconnect
void setTimeout(const utils::Timeval &timeout) final
PhysicalSocket physicalSocket
void onReadError(int errnum)
bool onSignal(int signum) final
std::size_t readFromPeer(char *chunk, std::size_t chunkLen) final
std::size_t getTotalProcessed() const override
SocketAddress localAddress
Config & getConfig() const
std::size_t getTotalRead() const override
void unobservedEvent() final
void onWriteError(int errnum)
~SocketConnectionT() override
void shutdownRead() final
SocketWriterT SocketWriter
PhysicalSocketT PhysicalSocket
typename PhysicalSocket::SocketAddress SocketAddress
const SocketAddress & getLocalAddress() const final
std::size_t getTotalQueued() const override
SocketReaderT SocketReader
std::string connectionName
core::socket::stream::SocketContext * newSocketContext
core::socket::stream::SocketContext * socketContext
const std::string & getConnectionName() const
SocketConnector(const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(core::eventreceiver::ConnectEventReceiver *)> &onInitState, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
typename PhysicalClientSocket::SocketAddress SocketAddress
SocketAddress remoteAddress
virtual void useNextSocketAddress()=0
void connectTimeout() final
SocketConnector(const SocketConnector &socketConnector)
std::function< void(const SocketAddress &, core::socket::State)> onStatus
SocketConnectionT< PhysicalClientSocket, Config > SocketConnection
std::function< void(SocketConnection *)> onConnected
PhysicalClientSocket physicalClientSocket
std::shared_ptr< Config > config
std::function< void(SocketConnection *)> onDisconnect
PhysicalSocketClientT PhysicalClientSocket
std::function< void(core::eventreceiver::ConnectEventReceiver *)> onInitState
std::function< void(SocketConnection *)> onConnect
void unobservedEvent() final
void connectEvent() final
~SocketConnector() override
std::size_t readFromPeer()
void onReadError(int errnum) override
void onWriteError(int errnum) override
PhysicalSocketT PhysicalSocket
SocketConnection(PhysicalSocket &&physicalSocket, const std::function< void(SocketConnection *)> &onDisconnect, const std::shared_ptr< Config > &config)
core::socket::stream::SocketConnectionT< PhysicalSocketT, core::socket::stream::legacy::SocketReader, core::socket::stream::legacy::SocketWriter, ConfigT > Super
typename Super::SocketAddress SocketAddress
SocketConnector(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(core::eventreceiver::ConnectEventReceiver *)> &onInitState, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
typename Super::Config Config
core::socket::stream::SocketConnector< PhysicalClientSocketT, ConfigT, core::socket::stream::legacy::SocketConnection > Super
SocketConnector(const SocketConnector &socketConnector)
void useNextSocketAddress() override
typename Super::SocketConnection SocketConnection
PreserveErrno(int newErrno=errno)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
std::string sigabbrev_np(int sig)