MQTTSuite
Loading...
Searching...
No Matches
mqttbridge.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025, 2026
5 *
6 * This program is free software: you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the Free
8 * Software Foundation, either version 3 of the License, or (at your option)
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "ConfigBridge.h"
44#include "config.h"
45#include "lib/BridgeStore.h"
46#include "lib/Mqtt.h"
47#include "lib/SSEDistributor.h"
48
49#include <core/SNodeC.h>
50#include <express/legacy/in/Server.h>
51#include <express/middleware/JsonMiddleware.h>
52#include <express/middleware/StaticMiddleware.h>
53#include <express/tls/in/Server.h>
54#include <iot/mqtt/MqttContext.h>
55#include <net/config/ConfigInstance.h>
56#include <utils/Config.h>
57//
58
59#ifndef DOXYGEN_SHOULD_SKIP_THIS
60
61//
62#include <log/Logger.h>
63//
64#include <nlohmann/json_fwd.hpp>
65//
66
67// Select necessary include files
68// ==============================
69#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4)
70#include <net/in/stream/legacy/SocketClient.h>
71#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4)
72#include <net/in/stream/tls/SocketClient.h>
73#endif
74#endif
75
76#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6)
77#include <net/in6/stream/legacy/SocketClient.h>
78#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6)
79#include <net/in6/stream/tls/SocketClient.h>
80#endif
81#endif
82
83#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX)
84#include <net/un/stream/legacy/SocketClient.h>
85#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS)
86#include <net/un/stream/tls/SocketClient.h>
87#endif
88#endif
89
90#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
91#include <web/http/legacy/in/Client.h>
92#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
93#include <web/http/tls/in/Client.h>
94#endif
95#endif
96
97#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
98#include <web/http/legacy/in6/Client.h>
99#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
100#include <web/http/tls/in6/Client.h>
101#endif
102#endif
103
104#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
105#include <web/http/legacy/un/Client.h>
106#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
107#include <web/http/tls/un/Client.h>
108#endif
109#endif
110
111#include <list>
112#include <nlohmann/json.hpp>
113#include <set>
114
115#endif
116
120
121static bool restart = false;
122
123static void startBridges();
124
125static void restartBridges() {
126 if (restart) {
127 VLOG(2) << "Restarting bridges...";
128
130
132
133 utils::Config::parse();
134
135 restart = false;
136 } else {
137 VLOG(2) << "No bridge restarted";
138 }
139}
140
141static void handleAutoConnectControllers(core::socket::stream::AutoConnectControl* autoConnectController) {
142 autoConnectControllers.insert(autoConnectController);
143 VLOG(2) << "Added: AutoConnectControl";
144
145 autoConnectController->setOnDestroy([](core::socket::stream::AutoConnectControl* autoConnectController) {
146 autoConnectControllers.erase(autoConnectController);
147 VLOG(2) << "Erased: AutoConnectControl";
148 });
149}
150
151static void handleConnector(core::eventreceiver::ConnectEventReceiver* connectEventReceiver) {
152 if (connectEventReceiver->isEnabled()) {
153 activeConnectors.insert(connectEventReceiver);
154 VLOG(2) << "Added: ConnectEventReceiver";
155 } else {
156 activeConnectors.erase(connectEventReceiver);
157 VLOG(2) << "Erased: ConnectEventReceiver";
158 }
159}
160
161static void handleConfig(net::config::ConfigInstance* configInstance) {
162 configInstances.insert(configInstance);
163 VLOG(2) << "Added: ConfigInstance: " << configInstance->getInstanceName();
164
165 configInstance->setOnDestroy([](const net::config::ConfigInstance* configInstance) {
166 configInstances.erase(configInstance);
167 VLOG(2) << "Erased: ConfigInstance: " << configInstance->getInstanceName();
168
169 if (configInstances.empty()) {
170 VLOG(2) << "All bridges stopped";
171
173
175 }
176 });
177}
178
179static bool closeBridges() {
180 if (!configInstances.empty()) {
182
183 for (const auto& [bridgeName, bridge] : mqtt::bridge::lib::BridgeStore::instance().getBridgeMap()) {
184 mqtt::bridge::lib::SSEDistributor::instance().bridgeStopping(bridgeName);
185
186 for (const auto& mqtt : bridge.getMqttList()) {
187 mqtt::bridge::lib::SSEDistributor::instance().brokerDisconnecting(
188 bridgeName, mqtt->getMqttContext()->getSocketConnection()->getInstanceName());
189
190 mqtt->sendDisconnect();
191 }
192 }
193
194 for (auto connectEventReceiver : activeConnectors) {
195 connectEventReceiver->stopConnect();
196 }
197
198 for (auto autoConnectControler : autoConnectControllers) {
199 autoConnectControler->stopReconnectAndRetry();
200 }
201 }
202
203 restart = true;
204
205 return configInstances.empty();
206}
207
208static void
209reportState(const std::string& instanceName, const core::socket::SocketAddress& socketAddress, const core::socket::State& state) {
210 switch (state) {
211 case core::socket::State::OK:
212 VLOG(1) << instanceName << ": connected to '" << socketAddress.toString() << "'";
213 break;
214 case core::socket::State::DISABLED:
215 VLOG(1) << instanceName << ": disabled";
216 break;
217 case core::socket::State::ERROR:
218 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
219 break;
220 case core::socket::State::FATAL:
221 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
222 break;
223 }
224}
225
226template <template <typename SocketContextFactoryT, typename... ArgsT> typename SocketClient>
227static SocketClient<mqtt::bridge::SocketContextFactory> startClient( //
228 const std::string& instanceName,
229 const std::function<void(typename SocketClient<mqtt::bridge::SocketContextFactory>::Config*)>& configurator) {
230 using Client = SocketClient<mqtt::bridge::SocketContextFactory>;
231 using SocketAddress = typename Client::SocketAddress;
232
233 Client socketClient = core::socket::stream::Client<Client>(instanceName, configurator);
234
235 socketClient.getConfig()->Instance::configurable(false);
236 socketClient.getConfig()->Remote::configurable(false);
237 socketClient.getConfig()->setRetry()->setRetryBase(1);
238 socketClient.getConfig()->setReconnect();
239
240 handleAutoConnectControllers(socketClient.getAutoConnectController());
241 handleConfig(socketClient.getConfig());
242
243 socketClient
244 .setOnInitState([](core::eventreceiver::ConnectEventReceiver* connectEventReceiver) {
245 handleConnector(connectEventReceiver);
246 })
247 .connect([instanceName](const SocketAddress& socketAddress, const core::socket::State& state) {
248 reportState(instanceName, socketAddress, state);
249 });
250
251 return socketClient;
252}
253
254template <typename HttpClient>
255static HttpClient startClient( //
256 const std::string& instanceName,
257 const std::function<void(typename HttpClient::Config*)>& configurator) {
258 using SocketAddress = typename HttpClient::SocketAddress;
259
260 HttpClient httpClient(
261 instanceName,
262 [](const std::shared_ptr<web::http::client::MasterRequest>& req) {
263 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
264
265 req->set("Sec-WebSocket-Protocol", "mqtt");
266
267 req->upgrade(
268 "/ws",
269 "websocket",
270 [connectionName](bool success) {
271 VLOG(1) << connectionName << ": HTTP Upgrade (http -> websocket||"
272 << "mqtt" << ") start " << (success ? "success" : "failed");
273 },
274 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req,
275 [[maybe_unused]] const std::shared_ptr<web::http::client::Response>& res,
276 [[maybe_unused]] bool success) {
277 },
278 [connectionName]([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req, const std::string& message) {
279 VLOG(1) << connectionName << ": Request parse error: " << message;
280 });
281 },
282 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req) {
283 VLOG(1) << "Session ended";
284 });
285
286 configurator(httpClient.getConfig());
287
288 httpClient.getConfig()->Instance::configurable(false);
289 httpClient.getConfig()->Remote::configurable(false);
290 httpClient.getConfig()->setRetry()->setRetryBase(1);
291 httpClient.getConfig()->setReconnect();
292
293 handleAutoConnectControllers(httpClient.getAutoConnectController());
294 handleConfig(httpClient.getConfig());
295
296 httpClient
297 .setOnInitState([](core::eventreceiver::ConnectEventReceiver* connectEventReceiver) {
298 handleConnector(connectEventReceiver);
299 })
300 .connect([instanceName](const SocketAddress& socketAddress, const core::socket::State& state) {
301 reportState(instanceName, socketAddress, state);
302 });
303
304 return httpClient;
305}
306
307static void startBridges() {
309
310 for (const auto& [bridgeName, bridge] : mqtt::bridge::lib::BridgeStore::instance().getBridgeMap()) {
311 VLOG(0) << "Starting bridge: " << bridgeName;
312
313 if (!bridge.getDisabled()) {
314 mqtt::bridge::lib::SSEDistributor::instance().bridgeStarting(bridgeName);
315
316 for (const auto& [fullInstanceName, broker] : bridge.getBrokerMap()) {
317 if (!broker.getDisabled()) {
318 mqtt::bridge::lib::SSEDistributor::instance().brokerConnecting(bridgeName, fullInstanceName);
319
320 VLOG(1) << " Creating broker instance: " << fullInstanceName;
321 VLOG(1) << " Broker prefix: " << broker.getPrefix();
322 VLOG(1) << " Broker client id: " << broker.getClientId();
323 VLOG(1) << " Broker disabled: " << broker.getDisabled();
324 VLOG(1) << " Broker address: " << broker.getAddress();
325 VLOG(1) << " Broker prefix: " << broker.getPrefix();
326 VLOG(1) << " Broker username: " << broker.getUsername();
327 VLOG(1) << " Broker password: " << broker.getPassword();
328 VLOG(1) << " Broker client-id: " << broker.getClientId();
329 VLOG(1) << " Broker clean session: " << broker.getCleanSession();
330 VLOG(1) << " Broker will-topic: " << broker.getWillTopic();
331 VLOG(1) << " Broker will-message: " << broker.getWillMessage();
332 VLOG(1) << " Broker will-qos: " << static_cast<int>(broker.getWillQoS());
333 VLOG(1) << " Broker will-retain: " << broker.getWillRetain();
334 VLOG(1) << " Broker loop prevention: " << broker.getLoopPrevention();
335 VLOG(1) << " Bridge disabled: " << bridge.getDisabled();
336 VLOG(1) << " Bridge prefix: " << bridge.getPrefix();
337 VLOG(1) << " Bridge Transport: " << broker.getTransport();
338 VLOG(1) << " Bridge Protocol: " << broker.getProtocol();
339 VLOG(1) << " Bridge Encryption: " << broker.getEncryption();
340
341 VLOG(1) << " Topics:";
342 const std::list<iot::mqtt::Topic>& topics = broker.getTopics();
343 for (const iot::mqtt::Topic& topic : topics) {
344 VLOG(1) << " " << topic.getName() << ":" << static_cast<uint16_t>(topic.getQoS());
345 }
346
347 const std::string& transport = broker.getTransport();
348 const std::string& protocol = broker.getProtocol();
349 const std::string& encryption = broker.getEncryption();
350
351 if (transport == "stream") {
352 if (protocol == "in") {
353 if (encryption == "legacy") {
354#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4)
355 startClient<net::in::stream::legacy::SocketClient>( //
356 fullInstanceName,
357 [&broker](net::in::stream::legacy::config::ConfigSocketClient* config) {
358 config->setDisableNagleAlgorithm();
359
360 config->Remote::setHost(broker.getAddress()["host"]);
361 config->Remote::setPort(broker.getAddress()["port"]);
362
363 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
364 });
365#else // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4
366 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
367 << "' not supported.";
368#endif // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4
369 } else if (encryption == "tls") {
370#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4)
371 startClient<net::in::stream::tls::SocketClient>( //
372 fullInstanceName,
373 [&broker](net::in::stream::tls::config::ConfigSocketClient* config) {
374 config->setDisableNagleAlgorithm();
375
376 config->Remote::setHost(broker.getAddress()["host"]);
377 config->Remote::setPort(broker.getAddress()["port"]);
378
379 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
380 });
381#else // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4
382 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
383 << "' not supported.";
384#endif // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4
385 }
386 } else if (protocol == "in6") {
387 if (encryption == "legacy") {
388#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6)
389 startClient<net::in6::stream::legacy::SocketClient>( //
390 fullInstanceName,
391 [&broker](net::in6::stream::legacy::config::ConfigSocketClient* config) {
392 config->setDisableNagleAlgorithm();
393
394 config->Remote::setHost(broker.getAddress()["host"]);
395 config->Remote::setPort(broker.getAddress()["port"]);
396
397 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
398 });
399#else // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6
400 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
401 << "' not supported.";
402#endif // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6
403 } else if (encryption == "tls") {
404#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6)
405 startClient<net::in6::stream::tls::SocketClient>( //
406 fullInstanceName,
407 [&broker](net::in6::stream::tls::config::ConfigSocketClient* config) {
408 config->setDisableNagleAlgorithm();
409
410 config->Remote::setHost(broker.getAddress()["host"]);
411 config->Remote::setPort(broker.getAddress()["port"]);
412
413 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
414 });
415#else // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6
416 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
417 << "' not supported.";
418#endif // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6
419 }
420 } else if (protocol == "un") {
421 if (encryption == "legacy") {
422#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX)
423 startClient<net::un::stream::legacy::SocketClient>( //
424 fullInstanceName,
425 [&broker](net::un::stream::legacy::config::ConfigSocketClient* config) {
426 config->Remote::setSunPath(broker.getAddress()["host"]);
427
428 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
429 });
430#else // CONFIG_MQTTSUITE_BRIDGE_UNIX
431 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
432 << "' not supported.";
433#endif // CONFIG_MQTTSUITE_BRIDGE_UNIX
434 } else if (encryption == "tls") {
435#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS)
436 startClient<net::un::stream::tls::SocketClient>( //
437 fullInstanceName,
438 [&broker](net::un::stream::tls::config::ConfigSocketClient* config) {
439 config->Remote::setSunPath(broker.getAddress()["host"]);
440
441 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
442 });
443#else // CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS
444 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
445 << "' not supported.";
446#endif // CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS
447 }
448 }
449 } else if (transport == "websocket") {
450 if (protocol == "in") {
451 if (encryption == "legacy") {
452#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
453 startClient<web::http::legacy::in::Client>( //
454 fullInstanceName,
455 [&broker](net::in::stream::legacy::config::ConfigSocketClient* config) {
456 config->setDisableNagleAlgorithm();
457
458 config->Remote::setHost(broker.getAddress()["host"]);
459 config->Remote::setPort(broker.getAddress()["port"]);
460
461 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
462 });
463#else // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4 && CONFIG_MQTTSUITE_BRIDGE_WS
464 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
465 << "' not supported.";
466#endif // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4 && CONFIG_MQTTSUITE_BRIDGE_WS
467 } else if (encryption == "tls") {
468#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
469 startClient<web::http::tls::in::Client>( //
470 fullInstanceName,
471 [&broker](net::in::stream::tls::config::ConfigSocketClient* config) {
472 config->setDisableNagleAlgorithm();
473
474 config->Remote::setHost(broker.getAddress()["host"]);
475 config->Remote::setPort(broker.getAddress()["port"]);
476
477 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
478 });
479#else // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4 && CONFIG_MQTTSUITE_BRIDGE_WSS
480 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
481 << "' not supported.";
482#endif // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4 && CONFIG_MQTTSUITE_BRIDGE_WSS
483 }
484 } else if (protocol == "in6") {
485 if (encryption == "legacy") {
486#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
487 startClient<web::http::legacy::in6::Client>( //
488 fullInstanceName,
489 [&broker](net::in6::stream::legacy::config::ConfigSocketClient* config) {
490 config->setDisableNagleAlgorithm();
491
492 config->Remote::setHost(broker.getAddress()["host"]);
493 config->Remote::setPort(broker.getAddress()["port"]);
494
495 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
496 });
497#else // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6 && CONFIG_MQTTSUITE_BRIDGE_WS
498 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
499 << "' not supported.";
500#endif // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6&& CONFIG_MQTTSUITE_BRIDGE_WS
501 } else if (encryption == "tls") {
502#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
503 startClient<web::http::tls::in6::Client>( //
504 fullInstanceName,
505 [&broker](net::in6::stream::tls::config::ConfigSocketClient* config) {
506 config->setDisableNagleAlgorithm();
507
508 config->Remote::setHost(broker.getAddress()["host"]);
509 config->Remote::setPort(broker.getAddress()["port"]);
510
511 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
512 });
513#else // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6 && CONFIG_MQTTSUITE_BRIDGE_WSS
514 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
515 << "' not supported.";
516#endif // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6 && CONFIG_MQTTSUITE_BRIDGE_WSS
517 }
518 } else if (protocol == "un") {
519 if (encryption == "legacy") {
520#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
521 startClient<web::http::legacy::un::Client>( //
522 fullInstanceName,
523 [&broker](net::un::stream::legacy::config::ConfigSocketClient* config) {
524 config->Remote::setSunPath(broker.getAddress()["path"]);
525
526 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
527 });
528#else // CONFIG_MQTTSUITE_BRIDGE_UNIX && CONFIG_MQTTSUITE_BRIDGE_WS
529 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
530 << "' not supported.";
531#endif // CONFIG_MQTTSUITE_BRIDGE_UNIX && CONFIG_MQTTSUITE_BRIDGE_WS
532 } else if (encryption == "tls") {
533#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
534 startClient<web::http::tls::un::Client>( //
535 fullInstanceName,
536 [&broker](net::un::stream::tls::config::ConfigSocketClient* config) {
537 config->Remote::setSunPath(broker.getAddress()["path"]);
538
539 config->setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
540 });
541#else // CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS && CONFIG_MQTTSUITE_BRIDGE_WSS
542 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
543 << "' not supported.";
544#endif // CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS && CONFIG_MQTTSUITE_BRIDGE_WSS
545 }
546 }
547 } else {
548 VLOG(1) << " Transport '" << transport << "' not supported.";
549 }
550 } else {
551 mqtt::bridge::lib::SSEDistributor::instance().brokerDisabled(bridgeName, fullInstanceName);
552 }
553 }
554 } else {
555 mqtt::bridge::lib::SSEDistributor::instance().bridgeDisabled(bridgeName);
556 }
557 }
558}
559
560int main(int argc, char* argv[]) {
561 utils::Config::configRoot.newSubCommand<mqtt::bridge::ConfigBridge>();
562
563 core::SNodeC::init(argc, argv);
564
565 const express::Router router(express::middleware::JsonMiddleware());
566
567 router.get("/api/bridge/config", [] APPLICATION(req, res) { // cppcheck-suppress unknownMacro
568 res->send(mqtt::bridge::lib::BridgeStore::instance().getBridgesConfigJson().dump(4));
569 });
570
571 router.patch("/api/bridge/config", [] APPLICATION(req, res) {
572 req->getAttribute<nlohmann::json>(
573 [&res](nlohmann::json& jsonPatch) {
574 if (!restart) {
575 if (mqtt::bridge::lib::BridgeStore::instance().patch(jsonPatch)) {
576 res->send(R"({"success": true, "message": "Bridge config patch applied"})"_json.dump());
577
578 if (closeBridges()) {
579 restartBridges();
580 }
581 } else {
582 res->status(404).send(R"({"success": false, "message": "Bridge config patch failed to applie"})"_json.dump());
583 }
584 } else {
585 res->status(409).send(
586 R"({"success": false, "message": "Bridge is in restarting state. Patch not applied"})"_json.dump());
587 }
588 },
589 [&res](const std::string& key) {
590 VLOG(1) << "Attribute type not found: " << key;
591
592 res->status(400).send("Attribute type not found: " + key);
593 });
594 });
595
596 router.get("/api/bridge/sse", [] APPLICATION(req, res) {
597 if (web::http::ciContains(req->get("Accept"), "text/event-stream")) {
598 res->set("Content-Type", "text/event-stream") //
599 .set("Cache-Control", "no-cache")
600 .set("Connection", "keep-alive");
601 res->sendHeader();
602
603 std::string data{"data"};
604 mqtt::bridge::lib::SSEDistributor::instance().addEventReceiver(res, req->get("Last-Event-ID"));
605 } else {
606 res->redirect("/clients");
607 }
608 });
609
610 router.get("/", [] APPLICATION(req, res) {
611 res->redirect("/config");
612 });
613
614 router.get("/config", [] APPLICATION(req, res) {
615 res->redirect("/config/index.html");
616 });
617
618 router.use("/config",
619 express::middleware::StaticMiddleware(utils::Config::configRoot.getSubCommand<mqtt::bridge::ConfigBridge>()->getHtmlDir()));
620
621 router.get("*", [] APPLICATION(req, res) {
622 res->redirect("/config/index.html");
623 });
624
625 express::legacy::in::Server( //
626 "admin-legacy",
627 router,
628 reportState, //
629 [](net::in::stream::legacy::config::ConfigSocketServer* config) {
630 config->setPort(8081);
631 config->setRetry();
632 config->setReuseAddress();
633 });
634
635 express::tls::in::Server( //
636 "admin-tls",
637 router,
639 [](net::in::stream::tls::config::ConfigSocketServer* config) {
640 config->setPort(8082);
641 config->setRetry();
642 config->setReuseAddress();
643 });
644
646 utils::Config::configRoot.getSubCommand<mqtt::bridge::ConfigBridge>()->getDefinitionFile())) {
648 } else {
649 VLOG(1) << "Loading bridge definition file failed";
650 }
651
652 return core::SNodeC::start();
653}
static BridgeStore & instance()
bool loadAndValidate(const std::string &fileName)
void addEventReceiver(const std::shared_ptr< express::Response > &response, const std::string &lastEventId)
static SSEDistributor & instance()
int main(int argc, char *argv[])
static SocketClient< mqtt::bridge::SocketContextFactory > startClient(const std::string &instanceName, const std::function< void(typename SocketClient< mqtt::bridge::SocketContextFactory >::Config *)> &configurator)
static bool closeBridges()
static void handleAutoConnectControllers(core::socket::stream::AutoConnectControl *autoConnectController)
static void handleConnector(core::eventreceiver::ConnectEventReceiver *connectEventReceiver)
static std::set< core::eventreceiver::ConnectEventReceiver * > activeConnectors
static std::set< core::socket::stream::AutoConnectControl * > autoConnectControllers
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
static void handleConfig(net::config::ConfigInstance *configInstance)
static std::set< const net::config::ConfigInstance * > configInstances
static void startBridges()
static bool restart
static void restartBridges()
static HttpClient startClient(const std::string &instanceName, const std::function< void(typename HttpClient::Config *)> &configurator)