SNode.C
Loading...
Searching...
No Matches
web::http::client::tools::EventSourceT< Client > Class Template Reference

#include <EventSource.h>

Inheritance diagram for web::http::client::tools::EventSourceT< Client >:
Collaboration diagram for web::http::client::tools::EventSourceT< Client >:

Classes

struct  SharedConfig

Public Types

using MasterRequest = typename Client::MasterRequest
using Request = typename Client::Request
using Response = typename Client::Response
using SocketConnection = typename Client::SocketConnection
using SocketAddress = typename SocketConnection::SocketAddress
using Config = typename Client::Config
Public Types inherited from web::http::client::tools::EventSource
enum class  ReadyState : int { CONNECTING = 0 , OPEN = 1 , CLOSED = 2 }

Public Member Functions

 EventSourceT (const EventSourceT &)=delete
EventSourceToperator= (const EventSourceT &)=delete
 EventSourceT (EventSourceT &&) noexcept=delete
EventSourceToperator= (EventSourceT &&) noexcept=delete
void close () override
Public Member Functions inherited from web::http::client::tools::EventSource
virtual ~EventSource ()
EventSourceonMessage (std::function< void(const MessageEvent &)> messageCallback)
EventSourceaddEventListener (const std::string &key, std::function< void(const MessageEvent &)> eventListener)
EventSourceremoveEventListeners (const std::string &type)
EventSourceonOpen (std::function< void()> onOpen)
EventSourceonError (std::function< void()> onError)
EventSource::ReadyState readyState () const
const std::string & lastEventId () const
uint32_t retry () const
EventSourceretry (uint32_t retry)

Protected Member Functions

 EventSourceT ()
void init (const std::string &scheme, const SocketAddress &socketAddress, const std::string &path)
Protected Member Functions inherited from web::http::client::tools::EventSource
 EventSource ()

Static Private Member Functions

static bool digits (std::string_view maybeDigitsAsString)
static uint32_t parseU32 (std::string_view uint32AsString)
static void deliverMessage (const std::shared_ptr< SharedState > &sharedState, const std::string &evType, const std::string &payload, const std::string &evId)
static void dispatch (const std::shared_ptr< SharedState > &sharedState)
static bool processLine (const std::shared_ptr< SharedState > &sharedState, const std::shared_ptr< SharedConfig > &sharedConfig, std::string_view line)
static bool parse (const std::shared_ptr< SharedState > &sharedState, const std::shared_ptr< SharedConfig > &sharedConfig)

Private Attributes

std::shared_ptr< Clientclient
SocketConnectionsocketConnection = nullptr
std::shared_ptr< SharedConfigsharedConfig

Additional Inherited Members

Protected Attributes inherited from web::http::client::tools::EventSource
std::shared_ptr< SharedStatesharedState

Detailed Description

template<typename Client>
class web::http::client::tools::EventSourceT< Client >

Definition at line 121 of file EventSource.h.

Member Typedef Documentation

◆ Config

template<typename Client>
using web::http::client::tools::EventSourceT< Client >::Config = typename Client::Config

Definition at line 130 of file EventSource.h.

◆ MasterRequest

template<typename Client>
using web::http::client::tools::EventSourceT< Client >::MasterRequest = typename Client::MasterRequest

Definition at line 125 of file EventSource.h.

◆ Request

template<typename Client>
using web::http::client::tools::EventSourceT< Client >::Request = typename Client::Request

Definition at line 126 of file EventSource.h.

◆ Response

template<typename Client>
using web::http::client::tools::EventSourceT< Client >::Response = typename Client::Response

Definition at line 127 of file EventSource.h.

◆ SocketAddress

template<typename Client>
using web::http::client::tools::EventSourceT< Client >::SocketAddress = typename SocketConnection::SocketAddress

Definition at line 129 of file EventSource.h.

◆ SocketConnection

template<typename Client>
using web::http::client::tools::EventSourceT< Client >::SocketConnection = typename Client::SocketConnection

Definition at line 128 of file EventSource.h.

Constructor & Destructor Documentation

◆ EventSourceT() [1/3]

template<typename Client>
web::http::client::tools::EventSourceT< Client >::EventSourceT ( const EventSourceT< Client > & )
delete

◆ EventSourceT() [2/3]

template<typename Client>
web::http::client::tools::EventSourceT< Client >::EventSourceT ( EventSourceT< Client > && )
deletenoexcept

◆ EventSourceT() [3/3]

template<typename Client>
web::http::client::tools::EventSourceT< Client >::EventSourceT ( )
inlineprotected

Member Function Documentation

◆ close()

template<typename Client>
void web::http::client::tools::EventSourceT< Client >::close ( )
inlineoverridevirtual

Implements web::http::client::tools::EventSource.

Definition at line 429 of file EventSource.h.

429 {
431
432 sharedConfig->config->setReconnect(false);
433 sharedConfig->config->setRetry(false);
434
435 if (socketConnection != nullptr) {
436 socketConnection->shutdownWrite(true);
437 }
438 }
std::shared_ptr< SharedState > sharedState

◆ deliverMessage()

template<typename Client>
void web::http::client::tools::EventSourceT< Client >::deliverMessage ( const std::shared_ptr< SharedState > & sharedState,
const std::string & evType,
const std::string & payload,
const std::string & evId )
inlinestaticprivate

Definition at line 160 of file EventSource.h.

163 {
164 const MessageEvent e{evType, payload, evId, sharedState->origin};
165
166 if (auto it = sharedState->onEventListener.find(evType); it != sharedState->onEventListener.end()) {
167 for (const auto& onEventListener : it->second) {
169 }
170 }
171
172 if (evType == "message") {
173 for (const auto& onMessageListener : sharedState->onMessageListener) {
175 }
176 }
177 }

Referenced by web::http::client::tools::EventSourceT< web::http::legacy::in::Client >::dispatch().

Here is the caller graph for this function:

◆ digits()

template<typename Client>
bool web::http::client::tools::EventSourceT< Client >::digits ( std::string_view maybeDigitsAsString)
inlinestaticprivate

Definition at line 137 of file EventSource.h.

137 {
138 for (const char maybeDigit : maybeDigitsAsString) {
139 if (std::isdigit(static_cast<unsigned char>(maybeDigit)) == 0) {
140 return false;
141 }
142 }
143
144 return !maybeDigitsAsString.empty();
145 }

Referenced by web::http::client::tools::EventSourceT< web::http::legacy::in::Client >::processLine().

Here is the caller graph for this function:

◆ dispatch()

template<typename Client>
void web::http::client::tools::EventSourceT< Client >::dispatch ( const std::shared_ptr< SharedState > & sharedState)
inlinestaticprivate

Definition at line 179 of file EventSource.h.

179 {
180 sharedState->lastId = sharedState->idBuf;
181 if (sharedState->data.empty()) {
182 sharedState->type.clear();
183 return;
184 }
185 if (!sharedState->data.empty() && sharedState->data.back() == '\n') {
186 sharedState->data.pop_back();
187 }
188
189 const std::string evType = sharedState->type.empty() ? "message" : std::exchange(sharedState->type, std::string{});
191 const std::string evId = sharedState->lastId;
192
194 }
static void deliverMessage(const std::shared_ptr< SharedState > &sharedState, const std::string &evType, const std::string &payload, const std::string &evId)

Referenced by web::http::client::tools::EventSourceT< web::http::legacy::in::Client >::parse().

Here is the caller graph for this function:

◆ init()

template<typename Client>
void web::http::client::tools::EventSourceT< Client >::init ( const std::string & scheme,
const SocketAddress & socketAddress,
const std::string & path )
inlineprotected

Definition at line 287 of file EventSource.h.

287 {
288 sharedState->path = path;
289 sharedState->origin = scheme + "://" + socketAddress.toString(false);
290
291 LOG(TRACE) << "Origin: " << sharedState->origin;
292 LOG(TRACE) << " Path: " << sharedState->path;
293
295
298 LOG(DEBUG) << socketConnection->getConnectionName() << " EventSource: OnConnect";
299
302 }
303 },
305 LOG(DEBUG) << socketConnection->getConnectionName() << " EventSource: OnConnected";
306 },
308 LOG(DEBUG) << socketConnection->getConnectionName() << " EventSource: OnDisconnect";
309
311 eventSource->socketConnection = nullptr;
312 }
313
314 if (sharedState->ready != ReadyState::CLOSED) {
315 if (auto it = sharedState->onEventListener.find("error"); it != sharedState->onEventListener.end()) {
316 EventSource::MessageEvent e{"error", "", sharedState->lastId, sharedState->origin};
317
318 for (auto& onError : it->second) {
319 onError(e);
320 }
321 }
322
323 for (const auto& onError : sharedState->onErrorListener) {
324 onError();
325 }
326
328 sharedState->data.clear();
329 sharedState->type.clear();
330 sharedState->idBuf.clear();
331 sharedState->pending.clear();
332
333 sharedConfig->config->setReconnectTime(sharedState->retry / 1000.);
334 sharedConfig->config->setRetryTimeout(sharedState->retry / 1000.);
335 }
336 },
339 const std::string connectionName = masterRequest->getSocketContext()->getSocketConnection()->getConnectionName();
340
341 LOG(DEBUG) << connectionName << " EventSource: OnRequestStart";
342
343 if (!sharedState->lastId.empty()) {
344 masterRequest->set("Last-Event-ID", sharedState->lastId);
345 }
346
347 if (!masterRequest->requestEventSource(
348 sharedState->path,
350 -> std::size_t {
351 std::size_t consumed = 0;
352
353 if (const std::shared_ptr<MasterRequest> masterRequest = masterRequestWeak.lock()) {
354 char buf[16384];
355 consumed = masterRequest->getSocketContext()->readFromPeer(buf, sizeof(buf));
356
357 sharedState->pending.append(buf, consumed);
358
359 if (!parse(sharedState, sharedConfig)) {
360 masterRequest->getSocketContext()->shutdownWrite(true);
361 }
362 } else {
363 LOG(DEBUG) << connectionName << ": server-sent event: server disconnect";
364 }
365
366 return consumed;
367 },
369 LOG(DEBUG) << connectionName << ": server-sent event stream start";
370
372
373 sharedConfig->config->setReconnectTime(sharedState->retry / 1000.);
374 sharedConfig->config->setRetryTimeout(sharedState->retry / 1000.);
375
376 if (auto it = sharedState->onEventListener.find("open"); it != sharedState->onEventListener.end()) {
377 EventSource::MessageEvent e{"open", "", sharedState->lastId, sharedState->origin};
378
379 for (auto& onOpen : it->second) {
380 onOpen(e);
381 }
382 }
383
384 for (const auto& onOpen : sharedState->onOpenListener) {
385 onOpen();
386 }
387 },
390 << ": not an server-sent event endpoint: " << sharedState->origin + sharedState->path;
391 })) {
394 }
395 }
396 },
397 [](const std::shared_ptr<Request>& req) {
398 LOG(DEBUG) << req->getSocketContext()->getSocketConnection()->getConnectionName() << " EventSource: OnRequestEnd";
399 });
400
401 client->getConfig().Remote::setSocketAddress(socketAddress);
402 client->getConfig().setReconnect();
403 client->getConfig().setRetry();
404 client->getConfig().setRetryBase(1);
405
406 sharedConfig->config = &client->getConfig();
407
408 client->connect([instanceName = client->getConfig().getInstanceName()](
410 const core::socket::State& state) { // example.com:81 simulate connnect timeout
411 switch (state) {
413 LOG(DEBUG) << instanceName << ": connected to '" << socketAddress.toString() << "'";
414 break;
416 LOG(DEBUG) << instanceName << ": disabled";
417 break;
419 LOG(DEBUG) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
420 break;
422 LOG(DEBUG) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
423 break;
424 }
425 });
426 }
typename Client::SocketConnection SocketConnection
std::shared_ptr< Client > client
EventSource * onOpen(std::function< void()> onOpen)
EventSource * onError(std::function< void()> onError)

Referenced by web::http::legacy::in6::EventSource::EventSource, web::http::legacy::in::EventSource(), web::http::legacy::rc::EventSource::EventSource, web::http::legacy::un::EventSource(), web::http::tls::in6::EventSource(), web::http::tls::in::EventSource(), web::http::tls::rc::EventSource(), and web::http::tls::un::EventSource().

Here is the caller graph for this function:

◆ operator=() [1/2]

template<typename Client>
EventSourceT & web::http::client::tools::EventSourceT< Client >::operator= ( const EventSourceT< Client > & )
delete

◆ operator=() [2/2]

template<typename Client>
EventSourceT & web::http::client::tools::EventSourceT< Client >::operator= ( EventSourceT< Client > && )
deletenoexcept

◆ parse()

template<typename Client>
bool web::http::client::tools::EventSourceT< Client >::parse ( const std::shared_ptr< SharedState > & sharedState,
const std::shared_ptr< SharedConfig > & sharedConfig )
inlinestaticprivate

Definition at line 244 of file EventSource.h.

244 {
245 static constexpr size_t kMaxLine = 256 * 1024;
246
247 bool success = true;
248 bool eol = true;
249 do {
250 const size_t i = sharedState->pending.find_first_of("\r\n");
251
252 eol = i != std::string::npos && (sharedState->pending[i] != '\r' || i + 1 < sharedState->pending.size());
253
254 if (eol && i <= kMaxLine) {
255 const size_t eolLength =
256 (sharedState->pending[i] == '\r' && i + 1 < sharedState->pending.size() && sharedState->pending[i + 1] == '\n') ? 2
257 : 1;
258
259 const std::string_view line(sharedState->pending.data(), i);
260
261 if (!line.empty()) {
263 } else {
265 }
266
267 sharedState->pending.erase(0, i + eolLength);
268 } else {
269 success = sharedState->pending.size() <= kMaxLine; // Line length
270 }
271 } while (eol && success);
272
273 return success;
274 }
static void dispatch(const std::shared_ptr< SharedState > &sharedState)
static bool processLine(const std::shared_ptr< SharedState > &sharedState, const std::shared_ptr< SharedConfig > &sharedConfig, std::string_view line)

Referenced by web::http::client::tools::EventSourceT< web::http::legacy::in::Client >::init().

Here is the caller graph for this function:

◆ parseU32()

template<typename Client>
uint32_t web::http::client::tools::EventSourceT< Client >::parseU32 ( std::string_view uint32AsString)
inlinestaticprivate

Definition at line 147 of file EventSource.h.

147 {
149
150 for (const char digit : uint32AsString) {
151 uint32AsUint64 = uint32AsUint64 * 10 + (static_cast<unsigned char>(digit) - '0');
152 if (uint32AsUint64 > 0xFFFFFFFFULL) {
153 return 0xFFFFFFFFU;
154 }
155 }
156
157 return static_cast<uint32_t>(uint32AsUint64);
158 }

Referenced by web::http::client::tools::EventSourceT< web::http::legacy::in::Client >::processLine().

Here is the caller graph for this function:

◆ processLine()

template<typename Client>
bool web::http::client::tools::EventSourceT< Client >::processLine ( const std::shared_ptr< SharedState > & sharedState,
const std::shared_ptr< SharedConfig > & sharedConfig,
std::string_view line )
inlinestaticprivate

Definition at line 196 of file EventSource.h.

198 {
199 bool success = true;
200
201 if (!line.empty() && line.front() != ':') {
204
205 if (auto p = line.find(':'); p != std::string_view::npos) {
206 field = line.substr(0, p);
207 value = line.substr(p + 1);
208 if (!value.empty() && value.front() == ' ') {
209 value.remove_prefix(1);
210 }
211 } else {
212 field = line;
213 }
214
215 if (field == "data") {
216 static constexpr size_t kMaxDataSize = 1 << 20;
217
218 success = sharedState->data.size() + value.size() + 1 <= kMaxDataSize; // Data length
219
220 if (success) {
221 sharedState->data.append(value);
222 sharedState->data.push_back('\n');
223 }
224 } else if (field == "event") {
225 sharedState->type = value;
226 } else if (field == "id") {
227 if (value.find('\0') == std::string_view::npos) {
228 sharedState->idBuf = value;
229 }
230 } else if (field == "retry") {
231 if (digits(value)) {
232 uint32_t const retry = parseU32(value);
233
234 sharedState->retry = retry;
235 sharedConfig->config->setReconnectTime(retry / 1000.);
236 sharedConfig->config->setRetryTimeout(retry / 1000.);
237 }
238 }
239 }
240
241 return success;
242 }
static bool digits(std::string_view maybeDigitsAsString)
static uint32_t parseU32(std::string_view uint32AsString)

Referenced by web::http::client::tools::EventSourceT< web::http::legacy::in::Client >::parse().

Here is the caller graph for this function:

Member Data Documentation

◆ client

template<typename Client>
std::shared_ptr<Client> web::http::client::tools::EventSourceT< Client >::client
private

◆ sharedConfig

◆ socketConnection

template<typename Client>
SocketConnection* web::http::client::tools::EventSourceT< Client >::socketConnection = nullptr
private

The documentation for this class was generated from the following file: