198 std::string_view line) {
201 if (!line.empty() && line.front() !=
':') {
202 std::string_view field;
203 std::string_view value;
205 if (
auto p = line.find(
':'); p != std::string_view::npos) {
206 field = line.substr(0, p);
207 value = line.substr(p + 1);
208 if (!value.empty() && value.front() ==
' ') {
209 value.remove_prefix(1);
215 if (field ==
"data") {
216 static constexpr size_t kMaxDataSize = 1 << 20;
218 success = sharedState->
data.size() + value.size() + 1 <= kMaxDataSize;
221 sharedState->
data.append(value);
222 sharedState->
data.push_back(
'\n');
224 }
else if (field ==
"event") {
225 sharedState->
type = value;
226 }
else if (field ==
"id") {
227 if (value.find(
'\0') == std::string_view::npos) {
228 sharedState->
idBuf = value;
230 }
else if (field ==
"retry") {
234 sharedState->
retry = retry;
235 sharedConfig->config->setReconnectTime(retry / 1000.);
236 sharedConfig->config->setRetryTimeout(retry / 1000.);
287 void init(
const std::string& scheme,
const SocketAddress& socketAddress,
const std::string& path) {
294 const std::weak_ptr<
EventSourceT> eventSourceWeak =
this->weak_from_this();
296 client = std::make_shared<Client>(
298 LOG(DEBUG) << socketConnection->getConnectionName() <<
" EventSource: OnConnect";
300 if (
const std::shared_ptr<
EventSourceT> eventStream = eventSourceWeak.lock()) {
301 eventStream->socketConnection = socketConnection;
305 LOG(DEBUG) << socketConnection->getConnectionName() <<
" EventSource: OnConnected";
307 [eventSourceWeak, sharedState =
this->sharedState, sharedConfig
= this->sharedConfig](
SocketConnection* socketConnection) {
308 LOG(DEBUG) << socketConnection->getConnectionName() <<
" EventSource: OnDisconnect";
310 if (
const std::shared_ptr<
EventSourceT> eventSource = eventSourceWeak.lock()) {
311 eventSource->socketConnection =
nullptr;
315 if (
auto it = sharedState->onEventListener.find(
"error"); it != sharedState->onEventListener.end()) {
318 for (
auto& onError : it->second) {
323 for (
const auto& onError : sharedState->onErrorListener) {
328 sharedState->data.clear();
329 sharedState->type.clear();
330 sharedState->idBuf.clear();
331 sharedState->pending.clear();
333 sharedConfig->config->setReconnectTime(sharedState->retry / 1000.);
334 sharedConfig->config->setRetryTimeout(sharedState->retry / 1000.);
337 [eventSourceWeak, sharedState =
this->sharedState, sharedConfig
= this->sharedConfig](
339 const std::string connectionName = masterRequest->getSocketContext()->getSocketConnection()->getConnectionName();
341 LOG(DEBUG) << connectionName <<
" EventSource: OnRequestStart";
343 if (!sharedState->lastId.empty()) {
344 masterRequest->set(
"Last-Event-ID", sharedState->lastId);
347 if (!masterRequest->requestEventSource(
349 [masterRequestWeak = std::weak_ptr<MasterRequest>(masterRequest), sharedState, sharedConfig, connectionName]()
351 std::size_t consumed = 0;
353 if (
const std::shared_ptr<
MasterRequest> masterRequest = masterRequestWeak.lock()) {
355 consumed = masterRequest->getSocketContext()->readFromPeer(buf,
sizeof(buf));
357 sharedState->pending.append(buf, consumed);
359 if (!
parse(sharedState
, sharedConfig
)) {
360 masterRequest->getSocketContext()->shutdownWrite(
true);
363 LOG(DEBUG) << connectionName <<
": server-sent event: server disconnect";
368 [sharedState, sharedConfig, connectionName]() {
369 LOG(DEBUG) << connectionName <<
": server-sent event stream start";
373 sharedConfig->config->setReconnectTime(sharedState->retry / 1000.);
374 sharedConfig->config->setRetryTimeout(sharedState->retry / 1000.);
376 if (
auto it = sharedState->onEventListener.find(
"open"); it != sharedState->onEventListener.end()) {
379 for (
auto& onOpen : it->second) {
384 for (
const auto& onOpen : sharedState->onOpenListener) {
388 [sharedState, connectionName]() {
389 LOG(DEBUG) << connectionName
390 <<
": not an server-sent event endpoint: " << sharedState->origin + sharedState->path;
392 if (
const std::shared_ptr<
EventSourceT> eventSource = eventSourceWeak.lock()) {
393 eventSource->socketConnection->close();
397 [](
const std::shared_ptr<
Request>& req) {
398 LOG(DEBUG) << req->getSocketContext()->getSocketConnection()->getConnectionName() <<
" EventSource: OnRequestEnd";
401 client->getConfig().Remote::setSocketAddress(socketAddress);
402 client->getConfig().setReconnect();
403 client->getConfig().setRetry();
404 client->getConfig().setRetryBase(1);
408 client->connect([instanceName =
client->getConfig().getInstanceName()](
410 const core::socket::
State& state) {
413 LOG(DEBUG) << instanceName <<
": connected to '" << socketAddress
.toString() <<
"'";
416 LOG(DEBUG) << instanceName <<
": disabled";
419 LOG(DEBUG) << instanceName <<
": " << socketAddress
.toString() <<
": " << state
.what();
422 LOG(DEBUG) << instanceName <<
": " << socketAddress
.toString() <<
": " << state
.what();