SNode.C
Loading...
Searching...
No Matches
EventSource.h
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#ifndef WEB_HTTP_CLIENT_TOOLS_EVENTSOURCE_H
43#define WEB_HTTP_CLIENT_TOOLS_EVENTSOURCE_H
44
45#include "core/socket/SocketAddress.h"
46
47#ifndef DOXYGEN_SHOULD_SKIP_THIS
48
49#include "core/socket/State.h"
50#include "log/Logger.h"
51
52#include <cctype>
53#include <cstddef>
54#include <cstdint>
55#include <functional>
56#include <list>
57#include <map>
58#include <memory>
59#include <regex>
60#include <string>
61#include <string_view>
62#include <utility>
63
64#endif // DOXYGEN_SHOULD_SKIP_THIS
65
66namespace web::http::client::tools {
67
69 public:
70 // JS-like readyState
71 enum class ReadyState : int { CONNECTING = 0, OPEN = 1, CLOSED = 2 };
72
73 // JS-like MessageEvent (simplified)
74 struct MessageEvent {
75 std::string type; // "message" or custom from "event:"
76 std::string data; // joined data (LF between lines, trailing LF removed)
77 std::string lastEventId; // from "id:"
78 std::string origin; // scheme://host[:port]
79 };
80
81 protected:
83
84 public:
85 virtual ~EventSource();
86
87 EventSource* onMessage(std::function<void(const MessageEvent&)> messageCallback);
88 EventSource* addEventListener(const std::string& key, std::function<void(const MessageEvent&)> eventListener);
89 EventSource* removeEventListeners(const std::string& type);
90 EventSource* onOpen(std::function<void()> onOpen);
91 EventSource* onError(std::function<void()> onError);
93 const std::string& lastEventId() const;
94 uint32_t retry() const;
95 EventSource* retry(uint32_t retry);
96 virtual void close() = 0;
97
98 protected:
99 struct SharedState {
100 std::list<std::function<void(const MessageEvent&)>> onMessageListener;
101 std::map<std::string, std::list<std::function<void(const MessageEvent&)>>> onEventListener;
102 std::list<std::function<void()>> onOpenListener;
103 std::list<std::function<void()>> onErrorListener;
104
105 std::string pending;
106 std::string data;
107 std::string type;
108 std::string idBuf;
109 std::string lastId;
110 uint32_t retry = 3000;
111
113 std::string scheme; // "http" or "https"
114 std::string host; // example.com
115 uint16_t port = 80;
116 std::string origin; // scheme://host[:port]
117 std::string path;
118 };
119
120 std::shared_ptr<SharedState> sharedState;
121 };
122
123 template <typename Client>
125 : public EventSource
126 , public std::enable_shared_from_this<EventSourceT<Client>> {
127 public:
128 using MasterRequest = typename Client::MasterRequest;
129 using Request = typename Client::Request;
130 using Response = typename Client::Response;
131 using SocketConnection = typename Client::SocketConnection;
132
133 private:
134 struct Config {
135 typename Client::Config* config;
136 };
137
138 static bool digits(std::string_view maybeDigitsAsString) {
139 for (const char maybeDigit : maybeDigitsAsString) {
140 if (std::isdigit(maybeDigit) == 0) {
141 return false;
142 }
143 }
144
145 return !maybeDigitsAsString.empty();
146 }
147
148 static uint32_t parseU32(std::string_view uint32AsString) {
149 uint64_t uint32AsUint64 = 0;
150
151 for (const char digit : uint32AsString) {
152 uint32AsUint64 = uint32AsUint64 * 10 + (static_cast<unsigned char>(digit) - '0');
153 if (uint32AsUint64 > 0xFFFFFFFFULL) {
154 return 0xFFFFFFFFU;
155 }
156 }
157
158 return static_cast<uint32_t>(uint32AsUint64);
159 }
160
161 static void deliverMessage(const std::shared_ptr<SharedState>& sharedState,
162 const std::string& evType,
163 const std::string& payload,
164 const std::string& evId) {
165 if (auto it = sharedState->onEventListener.find(evType); it != sharedState->onEventListener.end()) {
166 const MessageEvent e{evType, payload, evId, sharedState->origin};
167 for (const auto& onEventListener : it->second) {
168 onEventListener(e);
169 }
170 }
171
172 if (evType == "message") {
173 const MessageEvent e{evType, payload, evId, sharedState->origin};
174
175 for (const auto& onMessageListener : sharedState->onMessageListener) {
176 onMessageListener(e);
177 }
178 }
179 }
180
181 static void dispatch(const std::shared_ptr<SharedState>& sharedState) {
182 sharedState->lastId = sharedState->idBuf;
183 if (sharedState->data.empty()) {
184 sharedState->type.clear();
185 return;
186 }
187 if (!sharedState->data.empty() && sharedState->data.back() == '\n') {
188 sharedState->data.pop_back();
189 }
190
191 const std::string evType = sharedState->type.empty() ? "message" : std::exchange(sharedState->type, std::string{});
192 const std::string payload = std::exchange(sharedState->data, std::string{});
193 const std::string evId = sharedState->lastId;
194
195 deliverMessage(sharedState, evType, payload, evId);
196 }
197
198 static void
199 processLine(const std::shared_ptr<SharedState>& sharedState, const std::shared_ptr<Config>& sharedConfig, std::string_view line) {
200 if (line.empty()) {
201 dispatch(sharedState);
202 return;
203 }
204 if (line.front() == ':') {
205 return;
206 }
207
208 std::string_view field;
209 std::string_view value;
210
211 if (auto p = line.find(':'); p != std::string_view::npos) {
212 field = line.substr(0, p);
213 value = line.substr(p + 1);
214 if (!value.empty() && value.front() == ' ') {
215 value.remove_prefix(1);
216 }
217 } else {
218 field = line;
219 }
220
221 if (field == "data") {
222 sharedState->data.append(value);
223 sharedState->data.push_back('\n');
224 } else if (field == "event") {
225 sharedState->type = value;
226 } else if (field == "id") {
227 if (value.find('\0') == std::string_view::npos) {
228 sharedState->idBuf = value;
229 }
230 } else if (field == "retry") {
231 if (digits(value)) {
232 uint32_t const retry = parseU32(value);
233
234 sharedState->retry = retry;
235 sharedConfig->config->setReconnectTime(retry / 1000.);
236 sharedConfig->config->setRetryTimeout(retry / 1000.);
237 }
238 }
239 // else ignore
240 }
241
242 static void parse(const std::shared_ptr<SharedState>& sharedState, const std::shared_ptr<Config>& sharedConfig) {
243 size_t start = 0;
244 for (;;) {
245 const size_t i = sharedState->pending.find_first_of("\r\n", start);
246 if (i == std::string::npos) {
247 if (start != 0U) {
248 sharedState->pending.erase(0, start);
249 }
250 break;
251 }
252
253 if (sharedState->pending[i] == '\r' && i + 1 >= sharedState->pending.size()) {
254 if (start != 0U) {
255 sharedState->pending.erase(0, start);
256 }
257 break;
258 }
259
260 const size_t eol =
261 (sharedState->pending[i] == '\r' && i + 1 < sharedState->pending.size() && sharedState->pending[i + 1] == '\n') ? 2 : 1;
262
263 const std::string_view line(&sharedState->pending[start], i - start);
264 start = i + eol;
265 processLine(sharedState, sharedConfig, line);
266 }
267 }
268
269 public:
270 EventSourceT(const EventSourceT&) = delete;
272 EventSourceT(EventSourceT&&) noexcept = delete;
273 EventSourceT& operator=(EventSourceT&&) noexcept = delete;
274
275 protected:
277 : sharedConfig(std::make_shared<Config>()) {
278 }
279
280 void init(const std::string& url) {
281 static const std::regex re(R"(^((https?)://)?([^/:]+)(?::(\d+))?(/.*)?$)");
282
283 std::smatch match;
284 if (std::regex_match(url, match, re)) {
285 // capture URL parts for origin/ports
286 sharedState->scheme = (match[2].matched ? match[2].str() : "http");
287 sharedState->host = match[3].str();
288 const bool isHttps = (sharedState->scheme == "https");
290 match[4].matched ? static_cast<uint16_t>(std::stoi(match[4].str())) : static_cast<uint16_t>(isHttps ? 443 : 80);
291 sharedState->origin = sharedState->scheme + std::string("://") + sharedState->host +
292 (((isHttps && sharedState->port == 443) || (!isHttps && sharedState->port == 80))
293 ? ""
294 : (":" + std::to_string(sharedState->port)));
295
296 sharedState->path = (match[5].matched ? match[5].str() : "/");
297
298 LOG(TRACE) << "Full protocol: " << match[1];
299 LOG(TRACE) << " protocol: " << sharedState->scheme;
300 LOG(TRACE) << " Host: " << sharedState->host;
301 LOG(TRACE) << " Port: " << sharedState->port;
302 LOG(TRACE) << " Path: " << sharedState->path;
303 LOG(TRACE) << " Origin: " << sharedState->origin;
304
305 const std::weak_ptr<EventSourceT> eventStreamWeak = this->weak_from_this();
306
307 client = std::make_shared<Client>(
308 [eventStreamWeak](SocketConnection* socketConnection) {
309 LOG(DEBUG) << socketConnection->getConnectionName() << " EventSource: OnConnect";
310
311 if (const std::shared_ptr<EventSourceT> eventStream = eventStreamWeak.lock()) {
312 eventStream->socketConnection = socketConnection;
313 }
314 },
315 [state = this->sharedState](SocketConnection* socketConnection) {
316 LOG(DEBUG) << socketConnection->getConnectionName() << " EventSource: OnConnected";
317 },
318 [eventStreamWeak, sharedState = this->sharedState, sharedConfig = this->sharedConfig](
319 SocketConnection* socketConnection) {
320 LOG(DEBUG) << socketConnection->getConnectionName() << " EventSource: OnDisconnect";
321
322 if (const std::shared_ptr<EventSourceT> eventStream = eventStreamWeak.lock()) {
323 eventStream->socketConnection = nullptr;
324 }
325
326 if (sharedConfig->config->getReconnect()) {
327 for (const auto& onError : sharedState->onErrorListener) {
328 onError();
329 }
330 sharedState->ready = ReadyState::CONNECTING;
331
332 sharedConfig->config->setReconnectTime(sharedState->retry / 1000.);
333 sharedConfig->config->setRetryTimeout(sharedState->retry / 1000.);
334 } else {
335 sharedState->ready = ReadyState::CLOSED;
336 }
337 },
338 [url = match[5].matched ? match[5].str() : "/", sharedState = this->sharedState, sharedConfig = this->sharedConfig](
339 const std::shared_ptr<MasterRequest>& masterRequest) {
340 const std::string connectionName = masterRequest->getSocketContext()->getSocketConnection()->getConnectionName();
341
342 LOG(DEBUG) << connectionName << " EventSource: OnRequestStart";
343
344 masterRequest->requestEventSource(
345 url,
346 [masterRequestWeak = std::weak_ptr<MasterRequest>(masterRequest), sharedState, sharedConfig, connectionName]()
347 -> std::size_t {
348 std::size_t consumed = 0;
349
350 if (const std::shared_ptr<MasterRequest> masterRequest = masterRequestWeak.lock()) {
351 char buf[16384];
352 consumed = masterRequest->getSocketContext()->readFromPeer(buf, sizeof(buf));
353
354 if (consumed > 0) {
355 sharedState->pending.append(buf, consumed);
356 parse(sharedState, sharedConfig);
357 }
358 } else {
359 LOG(DEBUG) << connectionName << ": server-sent event: server disconnect";
360 }
361
362 return consumed;
363 },
364 [sharedState, sharedConfig, connectionName]() {
365 LOG(DEBUG) << connectionName << ": server-sent event stream start";
366
367 sharedState->ready = ReadyState::OPEN;
368
369 sharedConfig->config->setReconnectTime(sharedState->retry / 1000.);
370 sharedConfig->config->setRetryTimeout(sharedState->retry / 1000.);
371
372 for (const auto& onOpen : sharedState->onOpenListener) {
373 onOpen();
374 }
375 },
376 [sharedState, connectionName]() {
377 LOG(DEBUG) << connectionName
378 << ": not an server-sent event endpoint: " << sharedState->origin + sharedState->path;
379 });
380 },
381 [](const std::shared_ptr<Request>& req) {
382 LOG(DEBUG) << req->getSocketContext()->getSocketConnection()->getConnectionName() << " EventSource: OnRequestEnd";
383 });
384
385 client->getConfig().Remote::setHost(sharedState->host);
386 client->getConfig().Remote::setPort(sharedState->port);
387 client->getConfig().setReconnect();
388 client->getConfig().setRetry();
389 client->getConfig().setRetryBase(1);
390
391 sharedConfig->config = &client->getConfig();
392
393 client->connect([instanceName = client->getConfig().getInstanceName()](
394 const core::socket::SocketAddress& socketAddress,
395 const core::socket::State& state) { // example.com:81 simulate connnect timeout
396 switch (state) {
397 case core::socket::State::OK:
398 LOG(DEBUG) << instanceName << ": connected to '" << socketAddress.toString() << "'";
399 break;
400 case core::socket::State::DISABLED:
401 LOG(DEBUG) << instanceName << ": disabled";
402 break;
403 case core::socket::State::ERROR:
404 LOG(DEBUG) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
405 break;
406 case core::socket::State::FATAL:
407 LOG(DEBUG) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
408 break;
409 }
410 });
411 }
412 }
413
414 public:
415 void close() override {
416 sharedConfig->config->setReconnect(false);
417 sharedConfig->config->setRetry(false);
418
419 if (socketConnection != nullptr) {
420 socketConnection->shutdownRead();
421 }
422 }
423
424 private:
425 std::shared_ptr<Client> client;
426 SocketConnection* socketConnection = nullptr;
427 std::shared_ptr<Config> sharedConfig;
428 };
429
430} // namespace web::http::client::tools
431
432#endif // WEB_HTTP_CLIENT_TOOLS_EVENTSOURCE_H
virtual std::string toString(bool expanded=true) const =0
static constexpr int DISABLED
Definition State.h:56
static constexpr int ERROR
Definition State.h:57
std::string what() const
Definition State.cpp:114
static constexpr int FATAL
Definition State.h:58
static constexpr int OK
Definition State.h:55
static void parse(const std::shared_ptr< SharedState > &sharedState, const std::shared_ptr< Config > &sharedConfig)
static void processLine(const std::shared_ptr< SharedState > &sharedState, const std::shared_ptr< Config > &sharedConfig, std::string_view line)
EventSourceT(EventSourceT &&) noexcept=delete
std::shared_ptr< Config > sharedConfig
EventSourceT & operator=(EventSourceT &&) noexcept=delete
static void dispatch(const std::shared_ptr< SharedState > &sharedState)
EventSourceT(const EventSourceT &)=delete
static bool digits(std::string_view maybeDigitsAsString)
static uint32_t parseU32(std::string_view uint32AsString)
EventSourceT & operator=(const EventSourceT &)=delete
static void deliverMessage(const std::shared_ptr< SharedState > &sharedState, const std::string &evType, const std::string &payload, const std::string &evId)
void init(const std::string &url)
std::shared_ptr< Client > client
EventSource * onMessage(std::function< void(const MessageEvent &)> messageCallback)
EventSource * onOpen(std::function< void()> onOpen)
const std::string & lastEventId() const
EventSource * addEventListener(const std::string &key, std::function< void(const MessageEvent &)> eventListener)
EventSource * removeEventListeners(const std::string &type)
std::shared_ptr< SharedState > sharedState
EventSource * onError(std::function< void()> onError)
EventSource::ReadyState readyState() const
EventSource * retry(uint32_t retry)
std::list< std::function< void()> > onOpenListener
std::list< std::function< void()> > onErrorListener
std::list< std::function< void(const MessageEvent &)> > onMessageListener
std::map< std::string, std::list< std::function< void(const MessageEvent &)> > > onEventListener