MQTTSuite
Loading...
Searching...
No Matches
mqttbridge.cpp File Reference
#include "ConfigBridge.h"
#include "SocketContextFactory.h"
#include "config.h"
#include "lib/BridgeStore.h"
#include "lib/Mqtt.h"
#include "lib/SSEDistributor.h"
#include <core/SNodeC.h>
#include <express/legacy/in/Server.h>
#include <express/middleware/JsonMiddleware.h>
#include <express/middleware/StaticMiddleware.h>
#include <express/tls/in/Server.h>
#include <iot/mqtt/MqttContext.h>
#include <net/config/ConfigInstance.h>
#include <utils/Config.h>
Include dependency graph for mqttbridge.cpp:

Go to the source code of this file.

Functions

static void startBridges ()
static void restartBridges ()
static void handleAutoConnectControllers (core::socket::stream::AutoConnectControl *autoConnectController)
static void handleConnector (core::eventreceiver::ConnectEventReceiver *connectEventReceiver)
static void handleConfig (net::config::ConfigInstance *configInstance)
static bool closeBridges ()
static void reportState (const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
template<template< typename SocketContextFactoryT, typename... ArgsT > typename SocketClient>
static SocketClient< mqtt::bridge::SocketContextFactorystartClient (const std::string &instanceName, const std::function< void(typename SocketClient< mqtt::bridge::SocketContextFactory >::Config *)> &configurator)
template<typename HttpClient>
static HttpClient startClient (const std::string &instanceName, const std::function< void(typename HttpClient::Config *)> &configurator)
int main (int argc, char *argv[])

Variables

static std::set< core::eventreceiver::ConnectEventReceiver * > activeConnectors
static std::set< core::socket::stream::AutoConnectControl * > autoConnectControllers
static std::set< const net::config::ConfigInstance * > configInstances
static bool restart = false

Function Documentation

◆ closeBridges()

bool closeBridges ( )
static

Definition at line 179 of file mqttbridge.cpp.

179 {
180 if (!configInstances.empty()) {
182
183 for (const auto& [bridgeName, bridge] : mqtt::bridge::lib::BridgeStore::instance().getBridgeMap()) {
185
186 for (const auto& mqtt : bridge.getMqttList()) {
188 bridgeName, mqtt->getMqttContext()->getSocketConnection()->getInstanceName());
189
190 mqtt->sendDisconnect();
191 }
192 }
193
194 for (auto connectEventReceiver : activeConnectors) {
195 connectEventReceiver->stopConnect();
196 }
197
198 for (auto autoConnectControler : autoConnectControllers) {
199 autoConnectControler->stopReconnectAndRetry();
200 }
201 }
202
203 restart = true;
204
205 return configInstances.empty();
206}
static BridgeStore & instance()
static SSEDistributor & instance()
void brokerDisconnecting(const std::string &bridgeName, const std::string &instanceName)
void bridgeStopping(const std::string &bridgeName)
static std::set< core::eventreceiver::ConnectEventReceiver * > activeConnectors
static std::set< core::socket::stream::AutoConnectControl * > autoConnectControllers
static std::set< const net::config::ConfigInstance * > configInstances
static bool restart

References mqtt::bridge::lib::SSEDistributor::bridgesStopping(), mqtt::bridge::lib::SSEDistributor::instance(), and restart.

Here is the call graph for this function:

◆ handleAutoConnectControllers()

void handleAutoConnectControllers ( core::socket::stream::AutoConnectControl * autoConnectController)
static

Definition at line 141 of file mqttbridge.cpp.

141 {
142 autoConnectControllers.insert(autoConnectController);
143 VLOG(2) << "Added: AutoConnectControl";
144
145 autoConnectController->setOnDestroy([](core::socket::stream::AutoConnectControl* autoConnectController) {
146 autoConnectControllers.erase(autoConnectController);
147 VLOG(2) << "Erased: AutoConnectControl";
148 });
149}

References autoConnectControllers.

◆ handleConfig()

void handleConfig ( net::config::ConfigInstance * configInstance)
static

Definition at line 161 of file mqttbridge.cpp.

161 {
162 configInstances.insert(configInstance);
163 VLOG(2) << "Added: ConfigInstance: " << configInstance->getInstanceName();
164
165 configInstance->setOnDestroy([](const net::config::ConfigInstance* configInstance) {
166 configInstances.erase(configInstance);
167 VLOG(2) << "Erased: ConfigInstance: " << configInstance->getInstanceName();
168
169 if (configInstances.empty()) {
170 VLOG(2) << "All bridges stopped";
171
173
175 }
176 });
177}
static void restartBridges()

References mqtt::bridge::lib::SSEDistributor::bridgesStopped(), configInstances, mqtt::bridge::lib::SSEDistributor::instance(), and restartBridges().

Here is the call graph for this function:

◆ handleConnector()

void handleConnector ( core::eventreceiver::ConnectEventReceiver * connectEventReceiver)
static

Definition at line 151 of file mqttbridge.cpp.

151 {
152 if (connectEventReceiver->isEnabled()) {
153 activeConnectors.insert(connectEventReceiver);
154 VLOG(2) << "Added: ConnectEventReceiver";
155 } else {
156 activeConnectors.erase(connectEventReceiver);
157 VLOG(2) << "Erased: ConnectEventReceiver";
158 }
159}

References activeConnectors.

Referenced by startClient(), and startClient().

Here is the caller graph for this function:

◆ main()

int main ( int argc,
char * argv[] )

Definition at line 560 of file mqttbridge.cpp.

560 {
561 utils::Config::configRoot.newSubCommand<mqtt::bridge::ConfigBridge>();
562
563 core::SNodeC::init(argc, argv);
564
565 const express::Router router(express::middleware::JsonMiddleware());
566
567 router.get("/api/bridge/config", [] APPLICATION(req, res) { // cppcheck-suppress unknownMacro
568 res->send(mqtt::bridge::lib::BridgeStore::instance().getBridgesConfigJson().dump(4));
569 });
570
571 router.patch("/api/bridge/config", [] APPLICATION(req, res) {
572 req->getAttribute<nlohmann::json>(
573 [&res](nlohmann::json& jsonPatch) {
574 if (!restart) {
575 if (mqtt::bridge::lib::BridgeStore::instance().patch(jsonPatch)) {
576 res->send(R"({"success": true, "message": "Bridge config patch applied"})"_json.dump());
577
578 if (closeBridges()) {
580 }
581 } else {
582 res->status(404).send(R"({"success": false, "message": "Bridge config patch failed to applie"})"_json.dump());
583 }
584 } else {
585 res->status(409).send(
586 R"({"success": false, "message": "Bridge is in restarting state. Patch not applied"})"_json.dump());
587 }
588 },
589 [&res](const std::string& key) {
590 VLOG(1) << "Attribute type not found: " << key;
591
592 res->status(400).send("Attribute type not found: " + key);
593 });
594 });
595
596 router.get("/api/bridge/sse", [] APPLICATION(req, res) {
597 if (web::http::ciContains(req->get("Accept"), "text/event-stream")) {
598 res->set("Content-Type", "text/event-stream") //
599 .set("Cache-Control", "no-cache")
600 .set("Connection", "keep-alive");
601 res->sendHeader();
602
603 std::string data{"data"};
604 mqtt::bridge::lib::SSEDistributor::instance().addEventReceiver(res, req->get("Last-Event-ID"));
605 } else {
606 res->redirect("/clients");
607 }
608 });
609
610 router.get("/", [] APPLICATION(req, res) {
611 res->redirect("/config");
612 });
613
614 router.get("/config", [] APPLICATION(req, res) {
615 res->redirect("/config/index.html");
616 });
617
618 router.use("/config",
619 express::middleware::StaticMiddleware(utils::Config::configRoot.getSubCommand<mqtt::bridge::ConfigBridge>()->getHtmlDir()));
620
621 router.get("*", [] APPLICATION(req, res) {
622 res->redirect("/config/index.html");
623 });
624
625 express::legacy::in::Server( //
626 "admin-legacy",
627 router,
628 reportState, //
629 [](net::in::stream::legacy::config::ConfigSocketServer* config) {
630 config->setPort(8081);
631 config->setRetry();
632 config->setReuseAddress();
633 });
634
635 express::tls::in::Server( //
636 "admin-tls",
637 router,
639 [](net::in::stream::tls::config::ConfigSocketServer* config) {
640 config->setPort(8082);
641 config->setRetry();
642 config->setReuseAddress();
643 });
644
645 if (mqtt::bridge::lib::BridgeStore::instance().loadAndValidate(
646 utils::Config::configRoot.getSubCommand<mqtt::bridge::ConfigBridge>()->getDefinitionFile())) {
647 startBridges();
648 } else {
649 VLOG(1) << "Loading bridge definition file failed";
650 }
651
652 return core::SNodeC::start();
653}
void addEventReceiver(const std::shared_ptr< express::Response > &response, const std::string &lastEventId)
static bool closeBridges()
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
static void startBridges()

References mqtt::bridge::lib::SSEDistributor::addEventReceiver(), mqtt::bridge::lib::BridgeStore::instance(), mqtt::bridge::lib::SSEDistributor::instance(), mqtt::bridge::lib::BridgeStore::loadAndValidate(), reportState(), and startBridges().

Here is the call graph for this function:

◆ reportState()

void reportState ( const std::string & instanceName,
const core::socket::SocketAddress & socketAddress,
const core::socket::State & state )
static

Definition at line 209 of file mqttbridge.cpp.

209 {
210 switch (state) {
211 case core::socket::State::OK:
212 VLOG(1) << instanceName << ": connected to '" << socketAddress.toString() << "'";
213 break;
214 case core::socket::State::DISABLED:
215 VLOG(1) << instanceName << ": disabled";
216 break;
217 case core::socket::State::ERROR:
218 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
219 break;
220 case core::socket::State::FATAL:
221 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
222 break;
223 }
224}

Referenced by main().

Here is the caller graph for this function:

◆ restartBridges()

void restartBridges ( )
static

Definition at line 125 of file mqttbridge.cpp.

125 {
126 if (restart) {
127 VLOG(2) << "Restarting bridges...";
128
130
131 startBridges();
132
133 utils::Config::parse();
134
135 restart = false;
136 } else {
137 VLOG(2) << "No bridge restarted";
138 }
139}

References mqtt::bridge::lib::BridgeStore::activateStaged(), mqtt::bridge::lib::BridgeStore::instance(), restart, and startBridges().

Referenced by handleConfig().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ startBridges()

void startBridges ( )
static

Definition at line 307 of file mqttbridge.cpp.

307 {
309
310 for (const auto& [bridgeName, bridge] : mqtt::bridge::lib::BridgeStore::instance().getBridgeMap()) {
311 VLOG(0) << "Starting bridge: " << bridgeName;
312
313 if (!bridge.getDisabled()) {
315
316 for (const auto& [fullInstanceName, broker] : bridge.getBrokerMap()) {
317 if (!broker.getDisabled()) {
319
320 VLOG(1) << " Creating broker instance: " << fullInstanceName;
321 VLOG(1) << " Broker prefix: " << broker.getPrefix();
322 VLOG(1) << " Broker client id: " << broker.getClientId();
323 VLOG(1) << " Broker disabled: " << broker.getDisabled();
324 VLOG(1) << " Broker address: " << broker.getAddress();
325 VLOG(1) << " Broker prefix: " << broker.getPrefix();
326 VLOG(1) << " Broker username: " << broker.getUsername();
327 VLOG(1) << " Broker password: " << broker.getPassword();
328 VLOG(1) << " Broker client-id: " << broker.getClientId();
329 VLOG(1) << " Broker clean session: " << broker.getCleanSession();
330 VLOG(1) << " Broker will-topic: " << broker.getWillTopic();
331 VLOG(1) << " Broker will-message: " << broker.getWillMessage();
332 VLOG(1) << " Broker will-qos: " << static_cast<int>(broker.getWillQoS());
333 VLOG(1) << " Broker will-retain: " << broker.getWillRetain();
334 VLOG(1) << " Broker loop prevention: " << broker.getLoopPrevention();
335 VLOG(1) << " Bridge disabled: " << bridge.getDisabled();
336 VLOG(1) << " Bridge prefix: " << bridge.getPrefix();
337 VLOG(1) << " Bridge Transport: " << broker.getTransport();
338 VLOG(1) << " Bridge Protocol: " << broker.getProtocol();
339 VLOG(1) << " Bridge Encryption: " << broker.getEncryption();
340
341 VLOG(1) << " Topics:";
342 const std::list<iot::mqtt::Topic>& topics = broker.getTopics();
343 for (const iot::mqtt::Topic& topic : topics) {
344 VLOG(1) << " " << topic.getName() << ":" << static_cast<uint16_t>(topic.getQoS());
345 }
346
347 const std::string& transport = broker.getTransport();
348 const std::string& protocol = broker.getProtocol();
349 const std::string& encryption = broker.getEncryption();
350
351 if (transport == "stream") {
352 if (protocol == "in") {
353 if (encryption == "legacy") {
354#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4)
356 fullInstanceName,
357 [&broker](net::in::stream::legacy::config::ConfigSocketClient* config) {
358 config->setDisableNagleAlgorithm();
359
360 config->Remote::setHost(broker.getAddress()["host"]);
361 config->Remote::setPort(broker.getAddress()["port"]);
362
363 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
364 });
365#else // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4
366 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
367 << "' not supported.";
368#endif // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4
369 } else if (encryption == "tls") {
370#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4)
372 fullInstanceName,
373 [&broker](net::in::stream::tls::config::ConfigSocketClient* config) {
374 config->setDisableNagleAlgorithm();
375
376 config->Remote::setHost(broker.getAddress()["host"]);
377 config->Remote::setPort(broker.getAddress()["port"]);
378
379 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
380 });
381#else // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4
382 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
383 << "' not supported.";
384#endif // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4
385 }
386 } else if (protocol == "in6") {
387 if (encryption == "legacy") {
388#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6)
390 fullInstanceName,
391 [&broker](net::in6::stream::legacy::config::ConfigSocketClient* config) {
392 config->setDisableNagleAlgorithm();
393
394 config->Remote::setHost(broker.getAddress()["host"]);
395 config->Remote::setPort(broker.getAddress()["port"]);
396
397 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
398 });
399#else // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6
400 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
401 << "' not supported.";
402#endif // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6
403 } else if (encryption == "tls") {
404#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6)
406 fullInstanceName,
407 [&broker](net::in6::stream::tls::config::ConfigSocketClient* config) {
408 config->setDisableNagleAlgorithm();
409
410 config->Remote::setHost(broker.getAddress()["host"]);
411 config->Remote::setPort(broker.getAddress()["port"]);
412
413 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
414 });
415#else // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6
416 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
417 << "' not supported.";
418#endif // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6
419 }
420 } else if (protocol == "un") {
421 if (encryption == "legacy") {
422#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX)
424 fullInstanceName,
425 [&broker](net::un::stream::legacy::config::ConfigSocketClient* config) {
426 config->Remote::setSunPath(broker.getAddress()["host"]);
427
428 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
429 });
430#else // CONFIG_MQTTSUITE_BRIDGE_UNIX
431 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
432 << "' not supported.";
433#endif // CONFIG_MQTTSUITE_BRIDGE_UNIX
434 } else if (encryption == "tls") {
435#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS)
437 fullInstanceName,
438 [&broker](net::un::stream::tls::config::ConfigSocketClient* config) {
439 config->Remote::setSunPath(broker.getAddress()["host"]);
440
441 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
442 });
443#else // CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS
444 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
445 << "' not supported.";
446#endif // CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS
447 }
448 }
449 } else if (transport == "websocket") {
450 if (protocol == "in") {
451 if (encryption == "legacy") {
452#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
454 fullInstanceName,
455 [&broker](net::in::stream::legacy::config::ConfigSocketClient* config) {
456 config->setDisableNagleAlgorithm();
457
458 config->Remote::setHost(broker.getAddress()["host"]);
459 config->Remote::setPort(broker.getAddress()["port"]);
460
461 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
462 });
463#else // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4 && CONFIG_MQTTSUITE_BRIDGE_WS
464 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
465 << "' not supported.";
466#endif // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4 && CONFIG_MQTTSUITE_BRIDGE_WS
467 } else if (encryption == "tls") {
468#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
470 fullInstanceName,
471 [&broker](net::in::stream::tls::config::ConfigSocketClient* config) {
472 config->setDisableNagleAlgorithm();
473
474 config->Remote::setHost(broker.getAddress()["host"]);
475 config->Remote::setPort(broker.getAddress()["port"]);
476
477 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
478 });
479#else // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4 && CONFIG_MQTTSUITE_BRIDGE_WSS
480 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
481 << "' not supported.";
482#endif // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4 && CONFIG_MQTTSUITE_BRIDGE_WSS
483 }
484 } else if (protocol == "in6") {
485 if (encryption == "legacy") {
486#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
488 fullInstanceName,
489 [&broker](net::in6::stream::legacy::config::ConfigSocketClient* config) {
490 config->setDisableNagleAlgorithm();
491
492 config->Remote::setHost(broker.getAddress()["host"]);
493 config->Remote::setPort(broker.getAddress()["port"]);
494
495 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
496 });
497#else // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6 && CONFIG_MQTTSUITE_BRIDGE_WS
498 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
499 << "' not supported.";
500#endif // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6&& CONFIG_MQTTSUITE_BRIDGE_WS
501 } else if (encryption == "tls") {
502#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
504 fullInstanceName,
505 [&broker](net::in6::stream::tls::config::ConfigSocketClient* config) {
506 config->setDisableNagleAlgorithm();
507
508 config->Remote::setHost(broker.getAddress()["host"]);
509 config->Remote::setPort(broker.getAddress()["port"]);
510
511 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
512 });
513#else // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6 && CONFIG_MQTTSUITE_BRIDGE_WSS
514 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
515 << "' not supported.";
516#endif // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6 && CONFIG_MQTTSUITE_BRIDGE_WSS
517 }
518 } else if (protocol == "un") {
519 if (encryption == "legacy") {
520#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
522 fullInstanceName,
523 [&broker](net::un::stream::legacy::config::ConfigSocketClient* config) {
524 config->Remote::setSunPath(broker.getAddress()["path"]);
525
526 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
527 });
528#else // CONFIG_MQTTSUITE_BRIDGE_UNIX && CONFIG_MQTTSUITE_BRIDGE_WS
529 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
530 << "' not supported.";
531#endif // CONFIG_MQTTSUITE_BRIDGE_UNIX && CONFIG_MQTTSUITE_BRIDGE_WS
532 } else if (encryption == "tls") {
533#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
535 fullInstanceName,
536 [&broker](net::un::stream::tls::config::ConfigSocketClient* config) {
537 config->Remote::setSunPath(broker.getAddress()["path"]);
538
539 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
540 });
541#else // CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS && CONFIG_MQTTSUITE_BRIDGE_WSS
542 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
543 << "' not supported.";
544#endif // CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS && CONFIG_MQTTSUITE_BRIDGE_WSS
545 }
546 }
547 } else {
548 VLOG(1) << " Transport '" << transport << "' not supported.";
549 }
550 } else {
551 mqtt::bridge::lib::SSEDistributor::instance().brokerDisabled(bridgeName, fullInstanceName);
552 }
553 }
554 } else {
556 }
557 }
558}
void brokerConnecting(const std::string &bridgeName, const std::string &instanceName)
void brokerDisabled(const std::string &bridgeName, const std::string &instanceName)
void bridgeDisabled(const std::string &bridgeName)
void bridgeStarting(const std::string &bridgeName)
static SocketClient< mqtt::bridge::SocketContextFactory > startClient(const std::string &instanceName, const std::function< void(typename SocketClient< mqtt::bridge::SocketContextFactory >::Config *)> &configurator)

References mqtt::bridge::lib::SSEDistributor::bridgesStarting(), and mqtt::bridge::lib::SSEDistributor::instance().

Referenced by main(), and restartBridges().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ startClient() [1/2]

template<typename HttpClient>
HttpClient startClient ( const std::string & instanceName,
const std::function< void(typename HttpClient::Config *)> & configurator )
static

Definition at line 255 of file mqttbridge.cpp.

257 {
258 using SocketAddress = typename HttpClient::SocketAddress;
259
260 HttpClient httpClient(
261 instanceName,
262 [](const std::shared_ptr<web::http::client::MasterRequest>& req) {
263 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
264
265 req->set("Sec-WebSocket-Protocol", "mqtt");
266
267 req->upgrade(
268 "/ws",
269 "websocket",
270 [connectionName](bool success) {
271 VLOG(1) << connectionName << ": HTTP Upgrade (http -> websocket||"
272 << "mqtt" << ") start " << (success ? "success" : "failed");
273 },
274 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req,
275 [[maybe_unused]] const std::shared_ptr<web::http::client::Response>& res,
276 [[maybe_unused]] bool success) {
277 },
278 [connectionName]([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req, const std::string& message) {
279 VLOG(1) << connectionName << ": Request parse error: " << message;
280 });
281 },
282 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req) {
283 VLOG(1) << "Session ended";
284 });
285
286 configurator(httpClient.getConfig());
287
288 httpClient.getConfig()->Instance::configurable(false);
289 httpClient.getConfig()->Remote::configurable(false);
290 httpClient.getConfig()->setRetry()->setRetryBase(1);
291 httpClient.getConfig()->setReconnect();
292
293 handleAutoConnectControllers(httpClient.getAutoConnectController());
294 handleConfig(httpClient.getConfig());
295
296 httpClient
297 .setOnInitState([](core::eventreceiver::ConnectEventReceiver* connectEventReceiver) {
298 handleConnector(connectEventReceiver);
299 })
300 .connect([instanceName](const SocketAddress& socketAddress, const core::socket::State& state) {
301 reportState(instanceName, socketAddress, state);
302 });
303
304 return httpClient;
305}
static void handleAutoConnectControllers(core::socket::stream::AutoConnectControl *autoConnectController)
static void handleConnector(core::eventreceiver::ConnectEventReceiver *connectEventReceiver)
static void handleConfig(net::config::ConfigInstance *configInstance)

References handleConnector().

Here is the call graph for this function:

◆ startClient() [2/2]

template<template< typename SocketContextFactoryT, typename... ArgsT > typename SocketClient>
SocketClient< mqtt::bridge::SocketContextFactory > startClient ( const std::string & instanceName,
const std::function< void(typename SocketClient< mqtt::bridge::SocketContextFactory >::Config *)> & configurator )
static

Definition at line 227 of file mqttbridge.cpp.

229 {
230 using Client = SocketClient<mqtt::bridge::SocketContextFactory>;
231 using SocketAddress = typename Client::SocketAddress;
232
233 Client socketClient = core::socket::stream::Client<Client>(instanceName, configurator);
234
235 socketClient.getConfig()->Instance::configurable(false);
236 socketClient.getConfig()->Remote::configurable(false);
237 socketClient.getConfig()->setRetry()->setRetryBase(1);
238 socketClient.getConfig()->setReconnect();
239
240 handleAutoConnectControllers(socketClient.getAutoConnectController());
241 handleConfig(socketClient.getConfig());
242
243 socketClient
244 .setOnInitState([](core::eventreceiver::ConnectEventReceiver* connectEventReceiver) {
245 handleConnector(connectEventReceiver);
246 })
247 .connect([instanceName](const SocketAddress& socketAddress, const core::socket::State& state) {
248 reportState(instanceName, socketAddress, state);
249 });
250
251 return socketClient;
252}

References handleConnector().

Here is the call graph for this function:

Variable Documentation

◆ activeConnectors

std::set< core::eventreceiver::ConnectEventReceiver * > activeConnectors
static

Definition at line 117 of file mqttbridge.cpp.

Referenced by handleConnector().

◆ autoConnectControllers

std::set< core::socket::stream::AutoConnectControl * > autoConnectControllers
static

Definition at line 118 of file mqttbridge.cpp.

Referenced by handleAutoConnectControllers().

◆ configInstances

std::set< const net::config::ConfigInstance * > configInstances
static

Definition at line 119 of file mqttbridge.cpp.

Referenced by handleConfig().

◆ restart

bool restart = false
static

Definition at line 121 of file mqttbridge.cpp.

Referenced by closeBridges(), and restartBridges().