SNode.C
Loading...
Searching...
No Matches
SocketAcceptor.hpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "core/State.h"
43#include "core/socket/stream/SocketAcceptor.h"
44
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
46
47#include "log/Logger.h"
48#include "utils/PreserveErrno.h"
49
50#include <iomanip>
51#include <string>
52#include <utility>
53
54#endif // DOXYGEN_SHOULD_SKIP_THIS
55
56namespace core::socket::stream {
57
58 template <typename SocketAddress, typename PhysicalSocket, typename Config>
59 SocketAddress getLocalSocketAddress(PhysicalSocket& physicalSocket, Config& config) {
60 typename SocketAddress::SockAddr localSockAddr;
61 typename SocketAddress::SockLen localSockAddrLen = sizeof(typename SocketAddress::SockAddr);
62
63 SocketAddress localPeerAddress;
64 if (physicalSocket.getSockName(localSockAddr, localSockAddrLen) == 0) {
65 try {
66 localPeerAddress = config->Local::getSocketAddress(localSockAddr, localSockAddrLen);
67 LOG(TRACE) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
68 << " PeerAddress (local): " << localPeerAddress.toString();
69 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
70 LOG(WARNING) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
71 << " PeerAddress (local): " << badSocketAddress.what();
72 }
73 } else {
74 PLOG(WARNING) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
75 << " PeerAddress (local) not retrievable";
76 }
78 return localPeerAddress;
79 }
80
81 template <typename SocketAddress, typename PhysicalSocket, typename Config>
82 SocketAddress getRemoteSocketAddress(PhysicalSocket& physicalSocket, Config& config) {
83 typename SocketAddress::SockAddr remoteSockAddr;
84 typename SocketAddress::SockLen remoteSockAddrLen = sizeof(typename SocketAddress::SockAddr);
85
86 SocketAddress remotePeerAddress;
87 if (physicalSocket.getPeerName(remoteSockAddr, remoteSockAddrLen) == 0) {
88 try {
89 remotePeerAddress = config->Remote::getSocketAddress(remoteSockAddr, remoteSockAddrLen);
90 LOG(TRACE) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
91 << " PeerAddress (remote): " << remotePeerAddress.toString();
92 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
93 LOG(WARNING) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
94 << " PeerAddress (remote): " << badSocketAddress.what();
95 }
96 } else {
97 PLOG(WARNING) << config->getInstanceName() << " [" << physicalSocket.getFd() << "]" << std::setw(25)
98 << " PeerAddress (remote) not retrievable";
99 }
100
101 return remotePeerAddress;
102 }
103
104 template <typename PhysicalSocketServer, typename Config, template <typename PhysicalSocketServerT> typename SocketConnection>
105 SocketAcceptor<PhysicalSocketServer, Config, SocketConnection>::SocketAcceptor(
106 const std::shared_ptr<core::socket::stream::SocketContextFactory>& socketContextFactory,
107 const std::function<void(SocketConnection*)>& onConnect,
108 const std::function<void(SocketConnection*)>& onConnected,
109 const std::function<void(SocketConnection*)>& onDisconnect,
110 const std::function<void(const SocketAddress&, core::socket::State)>& onStatus,
111 const std::shared_ptr<Config>& config)
112 : core::eventreceiver::AcceptEventReceiver(config->getInstanceName() + " SocketAcceptor", 0)
113 , socketContextFactory(socketContextFactory)
114 , onConnect(onConnect)
115 , onConnected(onConnected)
116 , onDisconnect(onDisconnect)
117 , onStatus(onStatus)
118 , config(config) {
119 atNextTick([this]() {
120 if (core::eventLoopState() == core::State::RUNNING) {
121 init();
122 } else {
124 }
125 });
126 }
127
128 template <typename PhysicalSocketServer, typename Config, template <typename PhysicalSocketServerT> typename SocketConnection>
129 SocketAcceptor<PhysicalSocketServer, Config, SocketConnection>::SocketAcceptor(const SocketAcceptor& socketAcceptor)
130 : core::eventreceiver::AcceptEventReceiver(socketAcceptor.config->getInstanceName() + " SocketAcceptor", 0)
132 , onConnect(socketAcceptor.onConnect)
133 , onConnected(socketAcceptor.onConnected)
134 , onDisconnect(socketAcceptor.onDisconnect)
135 , onStatus(socketAcceptor.onStatus)
136 , config(socketAcceptor.config) {
137 atNextTick([this]() {
138 if (core::eventLoopState() == core::State::RUNNING) {
139 init();
140 } else {
142 }
143 });
144 }
145
146 template <typename PhysicalSocketServer, typename Config, template <typename PhysicalSocketServerT> typename SocketConnection>
147 SocketAcceptor<PhysicalSocketServer, Config, SocketConnection>::~SocketAcceptor() {
148 }
149
150 template <typename PhysicalSocketServer, typename Config, template <typename PhysicalSocketServerT> typename SocketConnection>
151 void SocketAcceptor<PhysicalSocketServer, Config, SocketConnection>::init() {
152 if (!config->getDisabled()) {
153 try {
154 LOG(TRACE) << config->getInstanceName() << " Starting";
155
156 localAddress = config->Local::getSocketAddress();
157
158 core::socket::State state = core::socket::STATE_OK;
159
160 if (physicalServerSocket.open(config->getSocketOptions(), PhysicalServerSocket::Flags::NONBLOCK) < 0) {
161 switch (errno) {
162 case EMFILE:
163 case ENFILE:
164 case ENOBUFS:
165 case ENOMEM:
166 PLOG(DEBUG) << config->getInstanceName() << " open: '" << localAddress.toString() << "'";
167
168 state = core::socket::STATE_ERROR;
169 break;
170 default:
171 PLOG(DEBUG) << config->getInstanceName() << " open: '" << localAddress.toString() << "'";
172
173 state = core::socket::STATE_FATAL;
174 break;
175 }
176 } else if (physicalServerSocket.bind(localAddress) < 0) {
177 switch (errno) {
178 case EADDRINUSE:
179 PLOG(DEBUG) << config->getInstanceName() << " bind: '" << localAddress.toString() << "'";
180
181 state = core::socket::STATE_ERROR;
182 break;
183 default:
184 PLOG(DEBUG) << config->getInstanceName() << " bind: '" << localAddress.toString() << "'";
185
186 state = core::socket::STATE_FATAL;
187 break;
188 }
189 } else if (physicalServerSocket.listen(config->getBacklog()) < 0) {
190 switch (errno) {
191 case EADDRINUSE:
192 PLOG(DEBUG) << config->getInstanceName() << " listen: '" << localAddress.toString() << "'";
193
194 state = core::socket::STATE_ERROR;
195 break;
196 default:
197 PLOG(DEBUG) << config->getInstanceName() << " listen: '" << localAddress.toString() << "'";
198
199 state = core::socket::STATE_FATAL;
200 break;
201 }
202 } else {
203 if (enable(physicalServerSocket.getFd())) {
204 LOG(DEBUG) << config->getInstanceName() << " enabled: '" << localAddress.toString() << "' success";
205 } else {
206 LOG(DEBUG) << config->getInstanceName() << " enabled: '" << localAddress.toString() << "' failed";
207
208 state = core::socket::STATE(core::socket::STATE_FATAL, ECANCELED, "SocketAcceptor not enabled");
209 }
210 }
211
212 SocketAddress currentLocalAddress = localAddress;
213 if (localAddress.useNext()) {
214 onStatus(currentLocalAddress, (state | core::socket::State::NO_RETRY));
215
216 LOG(DEBUG) << config->getInstanceName() << " using next SocketAddress: '"
217 << config->Local::getSocketAddress().toString() << "'";
218
220 } else {
221 onStatus(currentLocalAddress, state);
222 }
223 } catch (const typename SocketAddress::BadSocketAddress& badSocketAddress) {
224 LOG(DEBUG) << config->getInstanceName() << " " << badSocketAddress.what();
225
226 onStatus({}, core::socket::STATE(badSocketAddress.getState(), badSocketAddress.getErrnum(), badSocketAddress.what()));
227 }
228 } else {
229 LOG(DEBUG) << config->getInstanceName() << " disabled";
230
231 onStatus({}, core::socket::STATE_DISABLED);
232 }
233
234 if (isEnabled()) {
236 } else {
238 }
239 }
240
241 template <typename PhysicalSocketServer, typename Config, template <typename PhysicalSocketServerT> typename SocketConnection>
242 void SocketAcceptor<PhysicalSocketServer, Config, SocketConnection>::acceptEvent() {
243 int acceptsPerTick = config->getAcceptsPerTick();
244
245 do {
246 PhysicalServerSocket connectedPhysicalSocket(physicalServerSocket.accept4(PhysicalServerSocket::Flags::NONBLOCK),
247 physicalServerSocket.getBindAddress());
248 if (connectedPhysicalSocket.isValid()) {
249 LOG(DEBUG) << "[" << connectedPhysicalSocket.getFd() << " ]" << config->getInstanceName() << ": accept success: '"
250 << connectedPhysicalSocket.getBindAddress().toString() << "'";
251
252 SocketConnection* socketConnection =
253 new SocketConnection(config->getInstanceName(),
254 std::move(connectedPhysicalSocket),
256 localAddress.toString(false),
257 getLocalSocketAddress<SocketAddress>(connectedPhysicalSocket, config),
258 getRemoteSocketAddress<SocketAddress>(connectedPhysicalSocket, config),
259 config->getReadTimeout(),
260 config->getWriteTimeout(),
261 config->getReadBlockSize(),
262 config->getWriteBlockSize(),
263 config->getTerminateTimeout());
264
265 onConnect(socketConnection);
266 onConnected(socketConnection);
267 } else if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
268 PLOG(WARNING) << config->getInstanceName() << " accept failed: '" << physicalServerSocket.getBindAddress().toString()
269 << "'";
270 }
271 } while (--acceptsPerTick > 0);
272 }
273
274 template <typename PhysicalSocketServer, typename Config, template <typename PhysicalSocketServerT> typename SocketConnection>
275 void SocketAcceptor<PhysicalSocketServer, Config, SocketConnection>::unobservedEvent() {
277 }
278
279 template <typename PhysicalSocketServer, typename Config, template <typename PhysicalSocketServerT> typename SocketConnection>
280 void SocketAcceptor<PhysicalSocketServer, Config, SocketConnection>::destruct() {
281 delete this;
282 }
283
284} // namespace core::socket::stream
void setTimeout(const utils::Timeval &timeout)
static void atNextTick(const std::function< void(void)> &callBack)
static constexpr int NO_RETRY
Definition State.h:59
State operator|(int state)
Definition State.cpp:102
SocketAcceptor(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
std::function< void(SocketConnection *)> onConnected
SocketAcceptor(const SocketAcceptor &socketAcceptor)
std::function< void(SocketConnection *)> onConnect
std::function< void(SocketConnection *)> onDisconnect
std::shared_ptr< core::socket::stream::SocketContextFactory > socketContextFactory
std::shared_ptr< Config > config
PhysicalServerSocket physicalServerSocket
std::function< void(const SocketAddress &, core::socket::State)> onStatus
SocketAcceptor(const SocketAcceptor &socketAcceptor)
SocketAcceptor(const std::shared_ptr< core::socket::stream::SocketContextFactory > &socketContextFactory, const std::function< void(SocketConnection *)> &onConnect, const std::function< void(SocketConnection *)> &onConnected, const std::function< void(SocketConnection *)> &onDisconnect, const std::function< void(const SocketAddress &, core::socket::State)> &onStatus, const std::shared_ptr< Config > &config)
SocketAddress getRemoteSocketAddress(PhysicalSocket &physicalSocket, Config &config)
SocketAddress getLocalSocketAddress(PhysicalSocket &physicalSocket, Config &config)
State
Definition State.h:51
State eventLoopState()
Definition State.cpp:52