2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
42#ifndef WEB_HTTP_CLIENT_TOOLS_EVENTSOURCE_H
43#define WEB_HTTP_CLIENT_TOOLS_EVENTSOURCE_H
45#include "core/socket/SocketAddress.h"
47#ifndef DOXYGEN_SHOULD_SKIP_THIS
49#include "core/socket/State.h"
50#include "log/Logger.h"
66namespace web::http::client::
tools {
94 uint32_t
retry()
const;
123 template <
typename Client>
126 ,
public std::enable_shared_from_this<
EventSourceT<Client>> {
128 using MasterRequest =
typename Client::MasterRequest;
129 using Request =
typename Client::Request;
130 using Response =
typename Client::Response;
131 using SocketConnection =
typename Client::SocketConnection;
138 static bool digits(std::string_view maybeDigitsAsString) {
139 for (
const char maybeDigit : maybeDigitsAsString) {
140 if (std::isdigit(maybeDigit) == 0) {
145 return !maybeDigitsAsString.empty();
148 static uint32_t
parseU32(std::string_view uint32AsString) {
149 uint64_t uint32AsUint64 = 0;
151 for (
const char digit : uint32AsString) {
152 uint32AsUint64 = uint32AsUint64 * 10 + (
static_cast<
unsigned char>(digit) -
'0');
153 if (uint32AsUint64 > 0xFFFFFFFFULL) {
158 return static_cast<uint32_t>(uint32AsUint64);
162 const std::string& evType,
163 const std::string& payload,
164 const std::string& evId) {
167 for (
const auto& onEventListener : it->second) {
172 if (evType ==
"message") {
176 onMessageListener(e);
183 if (sharedState->
data.empty()) {
184 sharedState->
type.clear();
187 if (!sharedState->
data.empty() && sharedState->
data.back() ==
'\n') {
188 sharedState->
data.pop_back();
191 const std::string evType = sharedState->
type.empty() ?
"message" : std::exchange(sharedState->
type, std::string{});
192 const std::string payload = std::exchange(sharedState->
data, std::string{});
193 const std::string evId = sharedState->
lastId;
204 if (line.front() ==
':') {
208 std::string_view field;
209 std::string_view value;
211 if (
auto p = line.find(
':'); p != std::string_view::npos) {
212 field = line.substr(0, p);
213 value = line.substr(p + 1);
214 if (!value.empty() && value.front() ==
' ') {
215 value.remove_prefix(1);
221 if (field ==
"data") {
222 sharedState->
data.append(value);
223 sharedState->
data.push_back(
'\n');
224 }
else if (field ==
"event") {
225 sharedState->
type = value;
226 }
else if (field ==
"id") {
227 if (value.find(
'\0') == std::string_view::npos) {
228 sharedState->
idBuf = value;
230 }
else if (field ==
"retry") {
234 sharedState->
retry = retry;
235 sharedConfig->config->setReconnectTime(retry / 1000.);
236 sharedConfig->config->setRetryTimeout(retry / 1000.);
245 const size_t i = sharedState->
pending.find_first_of(
"\r\n", start);
246 if (i == std::string::npos) {
248 sharedState->
pending.erase(0, start);
253 if (sharedState->
pending[i] ==
'\r' && i + 1 >= sharedState->
pending.size()) {
255 sharedState->
pending.erase(0, start);
261 (sharedState->
pending[i] ==
'\r' && i + 1 < sharedState->
pending.size() && sharedState->
pending[i + 1] ==
'\n') ? 2 : 1;
263 const std::string_view line(&sharedState->
pending[start], i - start);
280 void init(
const std::string& url) {
281 static const std::regex re(R"(^((https?)://)?([^/:]+)(?::(\d+))?(/.*)?$)");
284 if (std::regex_match(url, match, re)) {
290 match[4].matched ?
static_cast<uint16_t>(std::stoi(match[4].str())) :
static_cast<uint16_t>(isHttps ? 443 : 80);
298 LOG(TRACE) <<
"Full protocol: " << match[1];
305 const std::weak_ptr<
EventSourceT> eventStreamWeak =
this->weak_from_this();
307 client = std::make_shared<Client>(
308 [eventStreamWeak](SocketConnection* socketConnection) {
309 LOG(DEBUG) << socketConnection->getConnectionName() <<
" EventSource: OnConnect";
311 if (
const std::shared_ptr<
EventSourceT> eventStream = eventStreamWeak.lock()) {
312 eventStream->socketConnection = socketConnection;
315 [state =
this->sharedState](SocketConnection* socketConnection) {
316 LOG(DEBUG) << socketConnection->getConnectionName() <<
" EventSource: OnConnected";
318 [eventStreamWeak, sharedState =
this->sharedState, sharedConfig
= this->sharedConfig](
319 SocketConnection* socketConnection) {
320 LOG(DEBUG) << socketConnection->getConnectionName() <<
" EventSource: OnDisconnect";
322 if (
const std::shared_ptr<
EventSourceT> eventStream = eventStreamWeak.lock()) {
323 eventStream->socketConnection =
nullptr;
326 if (sharedConfig->config->getReconnect()) {
327 for (
const auto& onError : sharedState->onErrorListener) {
332 sharedConfig->config->setReconnectTime(sharedState->retry / 1000.);
333 sharedConfig->config->setRetryTimeout(sharedState->retry / 1000.);
338 [url = match[5].matched ? match[5].str() :
"/", sharedState =
this->sharedState, sharedConfig
= this->sharedConfig](
339 const std::shared_ptr<MasterRequest>& masterRequest) {
340 const std::string connectionName = masterRequest->getSocketContext()->getSocketConnection()->getConnectionName();
342 LOG(DEBUG) << connectionName <<
" EventSource: OnRequestStart";
344 masterRequest->requestEventSource(
346 [masterRequestWeak = std::weak_ptr<MasterRequest>(masterRequest), sharedState, sharedConfig, connectionName]()
348 std::size_t consumed = 0;
350 if (
const std::shared_ptr<MasterRequest> masterRequest = masterRequestWeak.lock()) {
352 consumed = masterRequest->getSocketContext()->readFromPeer(buf,
sizeof(buf));
355 sharedState->pending.append(buf, consumed);
359 LOG(DEBUG) << connectionName <<
": server-sent event: server disconnect";
364 [sharedState, sharedConfig, connectionName]() {
365 LOG(DEBUG) << connectionName <<
": server-sent event stream start";
369 sharedConfig->config->setReconnectTime(sharedState->retry / 1000.);
370 sharedConfig->config->setRetryTimeout(sharedState->retry / 1000.);
372 for (
const auto& onOpen : sharedState->onOpenListener) {
376 [sharedState, connectionName]() {
377 LOG(DEBUG) << connectionName
378 <<
": not an server-sent event endpoint: " << sharedState->origin + sharedState->path;
381 [](
const std::shared_ptr<Request>& req) {
382 LOG(DEBUG) << req->getSocketContext()->getSocketConnection()->getConnectionName() <<
" EventSource: OnRequestEnd";
387 client->getConfig().setReconnect();
388 client->getConfig().setRetry();
389 client->getConfig().setRetryBase(1);
393 client->connect([instanceName =
client->getConfig().getInstanceName()](
395 const core::socket::
State& state) {
398 LOG(DEBUG) << instanceName <<
": connected to '" << socketAddress
.toString() <<
"'";
401 LOG(DEBUG) << instanceName <<
": disabled";
404 LOG(DEBUG) << instanceName <<
": " << socketAddress
.toString() <<
": " << state
.what();
407 LOG(DEBUG) << instanceName <<
": " << socketAddress
.toString() <<
": " << state
.what();
virtual std::string toString(bool expanded=true) const =0
static constexpr int DISABLED
static constexpr int ERROR
static constexpr int FATAL