256 const std::string& instanceName,
257 const std::function<
void(
typename HttpClient::Config*)>& configurator) {
258 using SocketAddress =
typename HttpClient::SocketAddress;
260 HttpClient httpClient(
262 [](
const std::shared_ptr<web::http::client::MasterRequest>& req) {
263 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
265 req->set(
"Sec-WebSocket-Protocol",
"mqtt");
270 [connectionName](
bool success) {
271 VLOG(1) << connectionName <<
": HTTP Upgrade (http -> websocket||"
272 <<
"mqtt" <<
") start " << (success ?
"success" :
"failed");
274 []([[maybe_unused]]
const std::shared_ptr<web::http::client::Request>& req,
275 [[maybe_unused]]
const std::shared_ptr<web::http::client::Response>& res,
276 [[maybe_unused]]
bool success) {
278 [connectionName]([[maybe_unused]]
const std::shared_ptr<web::http::client::Request>& req,
const std::string& message) {
279 VLOG(1) << connectionName <<
": Request parse error: " << message;
282 []([[maybe_unused]]
const std::shared_ptr<web::http::client::Request>& req) {
283 VLOG(1) <<
"Session ended";
286 configurator(httpClient.getConfig());
288 httpClient.getConfig()->Instance::configurable(
false);
289 httpClient.getConfig()->Remote::configurable(
false);
290 httpClient.getConfig()->setRetry()->setRetryBase(1);
291 httpClient.getConfig()->setReconnect();
293 handleAutoConnectControllers(httpClient.getAutoConnectController());
294 handleConfig(httpClient.getConfig());
297 .setOnInitState([](core::eventreceiver::ConnectEventReceiver* connectEventReceiver) {
300 .connect([instanceName](
const SocketAddress& socketAddress,
const core::socket::State& state) {
301 reportState(instanceName, socketAddress, state);
310 for (
const auto& [bridgeName, bridge] : mqtt::bridge::lib::BridgeStore::instance().getBridgeMap()) {
311 VLOG(0) <<
"Starting bridge: " << bridgeName;
313 if (!bridge.getDisabled()) {
314 mqtt::bridge::lib::SSEDistributor::instance().bridgeStarting(bridgeName);
316 for (
const auto& [fullInstanceName, broker] : bridge.getBrokerMap()) {
317 if (!broker.getDisabled()) {
318 mqtt::bridge::lib::SSEDistributor::instance().brokerConnecting(bridgeName, fullInstanceName);
320 VLOG(1) <<
" Creating broker instance: " << fullInstanceName;
321 VLOG(1) <<
" Broker prefix: " << broker.getPrefix();
322 VLOG(1) <<
" Broker client id: " << broker.getClientId();
323 VLOG(1) <<
" Broker disabled: " << broker.getDisabled();
324 VLOG(1) <<
" Broker address: " << broker.getAddress();
325 VLOG(1) <<
" Broker prefix: " << broker.getPrefix();
326 VLOG(1) <<
" Broker username: " << broker.getUsername();
327 VLOG(1) <<
" Broker password: " << broker.getPassword();
328 VLOG(1) <<
" Broker client-id: " << broker.getClientId();
329 VLOG(1) <<
" Broker clean session: " << broker.getCleanSession();
330 VLOG(1) <<
" Broker will-topic: " << broker.getWillTopic();
331 VLOG(1) <<
" Broker will-message: " << broker.getWillMessage();
332 VLOG(1) <<
" Broker will-qos: " <<
static_cast<
int>(broker.getWillQoS());
333 VLOG(1) <<
" Broker will-retain: " << broker.getWillRetain();
334 VLOG(1) <<
" Broker loop prevention: " << broker.getLoopPrevention();
335 VLOG(1) <<
" Bridge disabled: " << bridge.getDisabled();
336 VLOG(1) <<
" Bridge prefix: " << bridge.getPrefix();
337 VLOG(1) <<
" Bridge Transport: " << broker.getTransport();
338 VLOG(1) <<
" Bridge Protocol: " << broker.getProtocol();
339 VLOG(1) <<
" Bridge Encryption: " << broker.getEncryption();
341 VLOG(1) <<
" Topics:";
342 const std::list<iot::mqtt::Topic>& topics = broker.getTopics();
343 for (
const iot::mqtt::Topic& topic : topics) {
344 VLOG(1) <<
" " << topic.getName() <<
":" <<
static_cast<uint16_t>(topic.getQoS());
347 const std::string& transport = broker.getTransport();
348 const std::string& protocol = broker.getProtocol();
349 const std::string& encryption = broker.getEncryption();
351 if (transport ==
"stream") {
352 if (protocol ==
"in") {
353 if (encryption ==
"legacy") {
354#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4)
355 startClient<net::in::stream::legacy::SocketClient>(
357 [&broker](net::in::stream::legacy::config::ConfigSocketClient* config) {
358 config->setDisableNagleAlgorithm();
360 config->Remote::setHost(broker.getAddress()[
"host"]);
361 config->Remote::setPort(broker.getAddress()[
"port"]);
363 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
366 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
367 <<
"' not supported.";
369 }
else if (encryption ==
"tls") {
370#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4)
371 startClient<net::in::stream::tls::SocketClient>(
373 [&broker](net::in::stream::tls::config::ConfigSocketClient* config) {
374 config->setDisableNagleAlgorithm();
376 config->Remote::setHost(broker.getAddress()[
"host"]);
377 config->Remote::setPort(broker.getAddress()[
"port"]);
379 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
382 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
383 <<
"' not supported.";
386 }
else if (protocol ==
"in6") {
387 if (encryption ==
"legacy") {
388#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6)
389 startClient<net::in6::stream::legacy::SocketClient>(
391 [&broker](net::in6::stream::legacy::config::ConfigSocketClient* config) {
392 config->setDisableNagleAlgorithm();
394 config->Remote::setHost(broker.getAddress()[
"host"]);
395 config->Remote::setPort(broker.getAddress()[
"port"]);
397 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
400 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
401 <<
"' not supported.";
403 }
else if (encryption ==
"tls") {
404#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6)
405 startClient<net::in6::stream::tls::SocketClient>(
407 [&broker](net::in6::stream::tls::config::ConfigSocketClient* config) {
408 config->setDisableNagleAlgorithm();
410 config->Remote::setHost(broker.getAddress()[
"host"]);
411 config->Remote::setPort(broker.getAddress()[
"port"]);
413 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
416 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
417 <<
"' not supported.";
420 }
else if (protocol ==
"un") {
421 if (encryption ==
"legacy") {
422#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX)
423 startClient<net::un::stream::legacy::SocketClient>(
425 [&broker](net::un::stream::legacy::config::ConfigSocketClient* config) {
426 config->Remote::setSunPath(broker.getAddress()[
"host"]);
428 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
431 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
432 <<
"' not supported.";
434 }
else if (encryption ==
"tls") {
435#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS)
436 startClient<net::un::stream::tls::SocketClient>(
438 [&broker](net::un::stream::tls::config::ConfigSocketClient* config) {
439 config->Remote::setSunPath(broker.getAddress()[
"host"]);
441 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
444 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
445 <<
"' not supported.";
449 }
else if (transport ==
"websocket") {
450 if (protocol ==
"in") {
451 if (encryption ==
"legacy") {
452#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
453 startClient<web::http::legacy::in::Client>(
455 [&broker](net::in::stream::legacy::config::ConfigSocketClient* config) {
456 config->setDisableNagleAlgorithm();
458 config->Remote::setHost(broker.getAddress()[
"host"]);
459 config->Remote::setPort(broker.getAddress()[
"port"]);
461 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
464 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
465 <<
"' not supported.";
467 }
else if (encryption ==
"tls") {
468#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
469 startClient<web::http::tls::in::Client>(
471 [&broker](net::in::stream::tls::config::ConfigSocketClient* config) {
472 config->setDisableNagleAlgorithm();
474 config->Remote::setHost(broker.getAddress()[
"host"]);
475 config->Remote::setPort(broker.getAddress()[
"port"]);
477 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
480 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
481 <<
"' not supported.";
484 }
else if (protocol ==
"in6") {
485 if (encryption ==
"legacy") {
486#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
487 startClient<web::http::legacy::in6::Client>(
489 [&broker](net::in6::stream::legacy::config::ConfigSocketClient* config) {
490 config->setDisableNagleAlgorithm();
492 config->Remote::setHost(broker.getAddress()[
"host"]);
493 config->Remote::setPort(broker.getAddress()[
"port"]);
495 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
498 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
499 <<
"' not supported.";
501 }
else if (encryption ==
"tls") {
502#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
503 startClient<web::http::tls::in6::Client>(
505 [&broker](net::in6::stream::tls::config::ConfigSocketClient* config) {
506 config->setDisableNagleAlgorithm();
508 config->Remote::setHost(broker.getAddress()[
"host"]);
509 config->Remote::setPort(broker.getAddress()[
"port"]);
511 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
514 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
515 <<
"' not supported.";
518 }
else if (protocol ==
"un") {
519 if (encryption ==
"legacy") {
520#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
521 startClient<web::http::legacy::un::Client>(
523 [&broker](net::un::stream::legacy::config::ConfigSocketClient* config) {
524 config->Remote::setSunPath(broker.getAddress()[
"path"]);
526 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
529 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
530 <<
"' not supported.";
532 }
else if (encryption ==
"tls") {
533#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
534 startClient<web::http::tls::un::Client>(
536 [&broker](net::un::stream::tls::config::ConfigSocketClient* config) {
537 config->Remote::setSunPath(broker.getAddress()[
"path"]);
539 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
542 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
543 <<
"' not supported.";
548 VLOG(1) <<
" Transport '" << transport <<
"' not supported.";
551 mqtt::bridge::lib::SSEDistributor::instance().brokerDisabled(bridgeName, fullInstanceName);
555 mqtt::bridge::lib::SSEDistributor::instance().bridgeDisabled(bridgeName);
560int main(
int argc,
char* argv[]) {
563 core::SNodeC::init(argc, argv);
565 const express::Router router(
express::middleware::JsonMiddleware());
567 router.get(
"/api/bridge/config", [] APPLICATION(req, res) {
568 res->send(mqtt::bridge::lib::BridgeStore::instance().getBridgesConfigJson().dump(4));
571 router.patch(
"/api/bridge/config", [] APPLICATION(req, res) {
572 req->getAttribute<nlohmann::json>(
573 [&res](nlohmann::json& jsonPatch) {
575 if (mqtt::bridge::lib::BridgeStore::instance().patch(jsonPatch)) {
576 res->send(R"({"success": true, "message": "Bridge config patch applied"})"_json.dump());
578 if (closeBridges()) {
582 res->status(404).send(R"({"success": false, "message": "Bridge config patch failed to applie"})"_json.dump());
585 res->status(409).send(
586 R"({"success": false, "message": "Bridge is in restarting state. Patch not applied"})"_json.dump());
589 [&res](
const std::string& key) {
590 VLOG(1) <<
"Attribute type not found: " << key;
592 res->status(400).send(
"Attribute type not found: " + key);
596 router.get(
"/api/bridge/sse", [] APPLICATION(req, res) {
597 if (web::http::ciContains(req->get(
"Accept"),
"text/event-stream")) {
598 res->set(
"Content-Type",
"text/event-stream")
599 .set(
"Cache-Control",
"no-cache")
600 .set(
"Connection",
"keep-alive");
603 std::string data{
"data"};
606 res->redirect(
"/clients");
610 router.get(
"/", [] APPLICATION(req, res) {
611 res->redirect(
"/config");
614 router.get(
"/config", [] APPLICATION(req, res) {
615 res->redirect(
"/config/index.html");
618 router.use(
"/config",
619 express::middleware::StaticMiddleware(utils::Config::configRoot.getSubCommand<mqtt::
bridge::
ConfigBridge>()->getHtmlDir()));
621 router.get(
"*", [] APPLICATION(req, res) {
622 res->redirect(
"/config/index.html");
629 [](net::in::stream::legacy::config::ConfigSocketServer* config) {
630 config->setPort(8081);
632 config->setReuseAddress();
639 [](net::in::stream::tls::config::ConfigSocketServer* config) {
640 config->setPort(8082);
642 config->setReuseAddress();
646 utils::Config::configRoot.getSubCommand<mqtt::
bridge::
ConfigBridge>()->getDefinitionFile()
)) {
649 VLOG(1) <<
"Loading bridge definition file failed";
652 return core::SNodeC::start();