SNode.C
Loading...
Searching...
No Matches
EventSource.h
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#ifndef WEB_HTTP_CLIENT_TOOLS_EVENTSOURCE_H
43#define WEB_HTTP_CLIENT_TOOLS_EVENTSOURCE_H
44
45#include "core/socket/SocketAddress.h"
46
47#ifndef DOXYGEN_SHOULD_SKIP_THIS
48
49#include "core/socket/State.h"
50#include "log/Logger.h"
51
52#include <cctype>
53#include <cstddef>
54#include <cstdint>
55#include <functional>
56#include <list>
57#include <map>
58#include <memory>
59#include <string>
60#include <string_view>
61#include <utility>
62
63#endif // DOXYGEN_SHOULD_SKIP_THIS
64
65namespace web::http::client::tools {
66
68 public:
69 // JS-like readyState
70 enum class ReadyState : int { CONNECTING = 0, OPEN = 1, CLOSED = 2 };
71
72 // JS-like MessageEvent (simplified)
73 struct MessageEvent {
74 std::string type; // "message" or custom from "event:"
75 std::string data; // joined data (LF between lines, trailing LF removed)
76 std::string lastEventId; // from "id:"
77 std::string origin; // scheme://host[:port]
78 };
79
80 protected:
82
83 public:
84 virtual ~EventSource();
85
86 EventSource* onMessage(std::function<void(const MessageEvent&)> messageCallback);
87 EventSource* addEventListener(const std::string& key, std::function<void(const MessageEvent&)> eventListener);
88 EventSource* removeEventListeners(const std::string& type);
89 EventSource* onOpen(std::function<void()> onOpen);
90 EventSource* onError(std::function<void()> onError);
92 const std::string& lastEventId() const;
93 uint32_t retry() const;
94 EventSource* retry(uint32_t retry);
95 virtual void close() = 0;
96
97 protected:
98 struct SharedState {
99 std::list<std::function<void(const MessageEvent&)>> onMessageListener;
100 std::map<std::string, std::list<std::function<void(const MessageEvent&)>>> onEventListener;
101 std::list<std::function<void()>> onOpenListener;
102 std::list<std::function<void()>> onErrorListener;
103
104 std::string pending;
105 std::string data;
106 std::string type;
107 std::string idBuf;
108 std::string lastId;
109 uint32_t retry = 3000;
110
111 std::string origin; // scheme://host[:port]
112 std::string path;
113
115 };
116
117 std::shared_ptr<SharedState> sharedState;
118 };
119
120 template <typename Client>
122 : public EventSource
123 , public std::enable_shared_from_this<EventSourceT<Client>> {
124 public:
125 using MasterRequest = typename Client::MasterRequest;
126 using Request = typename Client::Request;
127 using Response = typename Client::Response;
128 using SocketConnection = typename Client::SocketConnection;
130 using Config = typename Client::Config;
131
132 private:
135 };
136
137 static bool digits(std::string_view maybeDigitsAsString) {
138 for (const char maybeDigit : maybeDigitsAsString) {
139 if (std::isdigit(static_cast<unsigned char>(maybeDigit)) == 0) {
140 return false;
141 }
142 }
143
144 return !maybeDigitsAsString.empty();
145 }
146
147 static uint32_t parseU32(std::string_view uint32AsString) {
148 uint64_t uint32AsUint64 = 0;
149
150 for (const char digit : uint32AsString) {
151 uint32AsUint64 = uint32AsUint64 * 10 + (static_cast<unsigned char>(digit) - '0');
152 if (uint32AsUint64 > 0xFFFFFFFFULL) {
153 return 0xFFFFFFFFU;
154 }
155 }
156
157 return static_cast<uint32_t>(uint32AsUint64);
158 }
159
160 static void deliverMessage(const std::shared_ptr<SharedState>& sharedState,
161 const std::string& evType,
162 const std::string& payload,
163 const std::string& evId) {
164 const MessageEvent e{evType, payload, evId, sharedState->origin};
165
166 if (auto it = sharedState->onEventListener.find(evType); it != sharedState->onEventListener.end()) {
167 for (const auto& onEventListener : it->second) {
168 onEventListener(e);
169 }
170 }
171
172 if (evType == "message") {
173 for (const auto& onMessageListener : sharedState->onMessageListener) {
174 onMessageListener(e);
175 }
176 }
177 }
178
179 static void dispatch(const std::shared_ptr<SharedState>& sharedState) {
180 sharedState->lastId = sharedState->idBuf;
181 if (sharedState->data.empty()) {
182 sharedState->type.clear();
183 return;
184 }
185 if (!sharedState->data.empty() && sharedState->data.back() == '\n') {
186 sharedState->data.pop_back();
187 }
188
189 const std::string evType = sharedState->type.empty() ? "message" : std::exchange(sharedState->type, std::string{});
190 const std::string payload = std::exchange(sharedState->data, std::string{});
191 const std::string evId = sharedState->lastId;
192
193 deliverMessage(sharedState, evType, payload, evId);
194 }
195
196 static bool processLine(const std::shared_ptr<SharedState>& sharedState,
197 const std::shared_ptr<SharedConfig>& sharedConfig,
198 std::string_view line) {
199 bool success = true;
200
201 if (!line.empty() && line.front() != ':') {
202 std::string_view field;
203 std::string_view value;
204
205 if (auto p = line.find(':'); p != std::string_view::npos) {
206 field = line.substr(0, p);
207 value = line.substr(p + 1);
208 if (!value.empty() && value.front() == ' ') {
209 value.remove_prefix(1);
210 }
211 } else {
212 field = line;
213 }
214
215 if (field == "data") {
216 static constexpr size_t kMaxDataSize = 1 << 20;
217
218 success = sharedState->data.size() + value.size() + 1 <= kMaxDataSize; // Data length
219
220 if (success) {
221 sharedState->data.append(value);
222 sharedState->data.push_back('\n');
223 }
224 } else if (field == "event") {
225 sharedState->type = value;
226 } else if (field == "id") {
227 if (value.find('\0') == std::string_view::npos) {
228 sharedState->idBuf = value;
229 }
230 } else if (field == "retry") {
231 if (digits(value)) {
232 uint32_t const retry = parseU32(value);
233
234 sharedState->retry = retry;
235 sharedConfig->config->setReconnectTime(retry / 1000.);
236 sharedConfig->config->setRetryTimeout(retry / 1000.);
237 }
238 }
239 }
240
241 return success;
242 }
243
244 static bool parse(const std::shared_ptr<SharedState>& sharedState, const std::shared_ptr<SharedConfig>& sharedConfig) {
245 static constexpr size_t kMaxLine = 256 * 1024;
246
247 bool success = true;
248 bool eol = true;
249 do {
250 const size_t i = sharedState->pending.find_first_of("\r\n");
251
252 eol = i != std::string::npos && (sharedState->pending[i] != '\r' || i + 1 < sharedState->pending.size());
253
254 if (eol && i <= kMaxLine) {
255 const size_t eolLength =
256 (sharedState->pending[i] == '\r' && i + 1 < sharedState->pending.size() && sharedState->pending[i + 1] == '\n') ? 2
257 : 1;
258
259 const std::string_view line(sharedState->pending.data(), i);
260
261 if (!line.empty()) {
262 success = processLine(sharedState, sharedConfig, line);
263 } else {
264 dispatch(sharedState);
265 }
266
267 sharedState->pending.erase(0, i + eolLength);
268 } else {
269 success = sharedState->pending.size() <= kMaxLine; // Line length
270 }
271 } while (eol && success);
272
273 return success;
274 }
275
276 public:
277 EventSourceT(const EventSourceT&) = delete;
279 EventSourceT(EventSourceT&&) noexcept = delete;
280 EventSourceT& operator=(EventSourceT&&) noexcept = delete;
281
282 protected:
284 : sharedConfig(std::make_shared<SharedConfig>()) {
285 }
286
287 void init(const std::string& scheme, const SocketAddress& socketAddress, const std::string& path) {
288 sharedState->path = path;
289 sharedState->origin = scheme + "://" + socketAddress.toString(false);
290
291 LOG(TRACE) << "Origin: " << sharedState->origin;
292 LOG(TRACE) << " Path: " << sharedState->path;
293
294 const std::weak_ptr<EventSourceT> eventSourceWeak = this->weak_from_this();
295
296 client = std::make_shared<Client>(
297 [eventSourceWeak](SocketConnection* socketConnection) {
298 LOG(DEBUG) << socketConnection->getConnectionName() << " EventSource: OnConnect";
299
300 if (const std::shared_ptr<EventSourceT> eventStream = eventSourceWeak.lock()) {
301 eventStream->socketConnection = socketConnection;
302 }
303 },
304 [](SocketConnection* socketConnection) {
305 LOG(DEBUG) << socketConnection->getConnectionName() << " EventSource: OnConnected";
306 },
307 [eventSourceWeak, sharedState = this->sharedState, sharedConfig = this->sharedConfig](SocketConnection* socketConnection) {
308 LOG(DEBUG) << socketConnection->getConnectionName() << " EventSource: OnDisconnect";
309
310 if (const std::shared_ptr<EventSourceT> eventSource = eventSourceWeak.lock()) {
311 eventSource->socketConnection = nullptr;
312 }
313
314 if (sharedState->ready != ReadyState::CLOSED) {
315 if (auto it = sharedState->onEventListener.find("error"); it != sharedState->onEventListener.end()) {
316 EventSource::MessageEvent e{"error", "", sharedState->lastId, sharedState->origin};
317
318 for (auto& onError : it->second) {
319 onError(e);
320 }
321 }
322
323 for (const auto& onError : sharedState->onErrorListener) {
324 onError();
325 }
326
327 sharedState->ready = ReadyState::CONNECTING;
328 sharedState->data.clear();
329 sharedState->type.clear();
330 sharedState->idBuf.clear();
331 sharedState->pending.clear();
332
333 sharedConfig->config->setReconnectTime(sharedState->retry / 1000.);
334 sharedConfig->config->setRetryTimeout(sharedState->retry / 1000.);
335 }
336 },
337 [eventSourceWeak, sharedState = this->sharedState, sharedConfig = this->sharedConfig](
338 const std::shared_ptr<MasterRequest>& masterRequest) {
339 const std::string connectionName = masterRequest->getSocketContext()->getSocketConnection()->getConnectionName();
340
341 LOG(DEBUG) << connectionName << " EventSource: OnRequestStart";
342
343 if (!sharedState->lastId.empty()) {
344 masterRequest->set("Last-Event-ID", sharedState->lastId);
345 }
346
347 if (!masterRequest->requestEventSource(
348 sharedState->path,
349 [masterRequestWeak = std::weak_ptr<MasterRequest>(masterRequest), sharedState, sharedConfig, connectionName]()
350 -> std::size_t {
351 std::size_t consumed = 0;
352
353 if (const std::shared_ptr<MasterRequest> masterRequest = masterRequestWeak.lock()) {
354 char buf[16384];
355 consumed = masterRequest->getSocketContext()->readFromPeer(buf, sizeof(buf));
356
357 sharedState->pending.append(buf, consumed);
358
359 if (!parse(sharedState, sharedConfig)) {
360 masterRequest->getSocketContext()->shutdownWrite(true);
361 }
362 } else {
363 LOG(DEBUG) << connectionName << ": server-sent event: server disconnect";
364 }
365
366 return consumed;
367 },
368 [sharedState, sharedConfig, connectionName]() {
369 LOG(DEBUG) << connectionName << ": server-sent event stream start";
370
371 sharedState->ready = ReadyState::OPEN;
372
373 sharedConfig->config->setReconnectTime(sharedState->retry / 1000.);
374 sharedConfig->config->setRetryTimeout(sharedState->retry / 1000.);
375
376 if (auto it = sharedState->onEventListener.find("open"); it != sharedState->onEventListener.end()) {
377 EventSource::MessageEvent e{"open", "", sharedState->lastId, sharedState->origin};
378
379 for (auto& onOpen : it->second) {
380 onOpen(e);
381 }
382 }
383
384 for (const auto& onOpen : sharedState->onOpenListener) {
385 onOpen();
386 }
387 },
388 [sharedState, connectionName]() {
389 LOG(DEBUG) << connectionName
390 << ": not an server-sent event endpoint: " << sharedState->origin + sharedState->path;
391 })) {
392 if (const std::shared_ptr<EventSourceT> eventSource = eventSourceWeak.lock()) {
393 eventSource->socketConnection->close();
394 }
395 }
396 },
397 [](const std::shared_ptr<Request>& req) {
398 LOG(DEBUG) << req->getSocketContext()->getSocketConnection()->getConnectionName() << " EventSource: OnRequestEnd";
399 });
400
401 client->getConfig().Remote::setSocketAddress(socketAddress);
402 client->getConfig().setReconnect();
403 client->getConfig().setRetry();
404 client->getConfig().setRetryBase(1);
405
406 sharedConfig->config = &client->getConfig();
407
408 client->connect([instanceName = client->getConfig().getInstanceName()](
409 const core::socket::SocketAddress& socketAddress,
410 const core::socket::State& state) { // example.com:81 simulate connnect timeout
411 switch (state) {
412 case core::socket::State::OK:
413 LOG(DEBUG) << instanceName << ": connected to '" << socketAddress.toString() << "'";
414 break;
415 case core::socket::State::DISABLED:
416 LOG(DEBUG) << instanceName << ": disabled";
417 break;
418 case core::socket::State::ERROR:
419 LOG(DEBUG) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
420 break;
421 case core::socket::State::FATAL:
422 LOG(DEBUG) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
423 break;
424 }
425 });
426 }
427
428 public:
429 void close() override {
431
432 sharedConfig->config->setReconnect(false);
433 sharedConfig->config->setRetry(false);
434
435 if (socketConnection != nullptr) {
436 socketConnection->shutdownWrite(true);
437 }
438 }
439
440 private:
441 std::shared_ptr<Client> client;
443 std::shared_ptr<SharedConfig> sharedConfig;
444 };
445
446} // namespace web::http::client::tools
447
448#endif // WEB_HTTP_CLIENT_TOOLS_EVENTSOURCE_H
virtual std::string toString(bool expanded=true) const =0
static constexpr int DISABLED
Definition State.h:56
static constexpr int ERROR
Definition State.h:57
std::string what() const
Definition State.cpp:114
static constexpr int FATAL
Definition State.h:58
static constexpr int OK
Definition State.h:55
typename SocketConnection::SocketAddress SocketAddress
std::shared_ptr< SharedConfig > sharedConfig
void init(const std::string &scheme, const SocketAddress &socketAddress, const std::string &path)
static bool parse(const std::shared_ptr< SharedState > &sharedState, const std::shared_ptr< SharedConfig > &sharedConfig)
EventSourceT(EventSourceT &&) noexcept=delete
typename Client::Request Request
typename Client::MasterRequest MasterRequest
EventSourceT & operator=(EventSourceT &&) noexcept=delete
static void dispatch(const std::shared_ptr< SharedState > &sharedState)
EventSourceT(const EventSourceT &)=delete
static bool digits(std::string_view maybeDigitsAsString)
static bool processLine(const std::shared_ptr< SharedState > &sharedState, const std::shared_ptr< SharedConfig > &sharedConfig, std::string_view line)
static uint32_t parseU32(std::string_view uint32AsString)
typename Client::SocketConnection SocketConnection
typename Client::Response Response
EventSourceT & operator=(const EventSourceT &)=delete
static void deliverMessage(const std::shared_ptr< SharedState > &sharedState, const std::string &evType, const std::string &payload, const std::string &evId)
std::shared_ptr< Client > client
EventSource * onMessage(std::function< void(const MessageEvent &)> messageCallback)
EventSource * onOpen(std::function< void()> onOpen)
const std::string & lastEventId() const
EventSource * addEventListener(const std::string &key, std::function< void(const MessageEvent &)> eventListener)
EventSource * removeEventListeners(const std::string &type)
std::shared_ptr< SharedState > sharedState
EventSource * onError(std::function< void()> onError)
EventSource::ReadyState readyState() const
EventSource * retry(uint32_t retry)
std::list< std::function< void()> > onOpenListener
std::list< std::function< void()> > onErrorListener
std::list< std::function< void(const MessageEvent &)> > onMessageListener
Definition EventSource.h:99
std::map< std::string, std::list< std::function< void(const MessageEvent &)> > > onEventListener