MQTTSuite
Loading...
Searching...
No Matches
mqttbridge.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the Free
8 * Software Foundation, either version 3 of the License, or (at your option)
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "SocketContextFactory.h" // IWYU pragma: keep
43#include "config.h"
44#include "lib/BridgeStore.h"
45
46#ifndef DOXYGEN_SHOULD_SKIP_THIS
47
48#include <core/SNodeC.h>
49//
50#include <log/Logger.h>
51#include <utils/Config.h>
52//
53#include <utils/CLI11.hpp>
54//
55
56// Select necessary include files
57// ==============================
58#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4)
59#include <net/in/stream/legacy/SocketClient.h>
60#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4)
61#include <net/in/stream/tls/SocketClient.h>
62#endif
63#endif
64
65#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6)
66#include <net/in6/stream/legacy/SocketClient.h>
67#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6)
68#include <net/in6/stream/tls/SocketClient.h>
69#endif
70#endif
71
72#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX)
73#include <net/un/stream/legacy/SocketClient.h>
74#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS)
75#include <net/un/stream/tls/SocketClient.h>
76#endif
77#endif
78
79#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
80#include <web/http/legacy/in/Client.h>
81#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
82#include <web/http/tls/in/Client.h>
83#endif
84#endif
85
86#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
87#include <web/http/legacy/in6/Client.h>
88#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
89#include <web/http/tls/in6/Client.h>
90#endif
91#endif
92
93#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
94#include <web/http/legacy/un/Client.h>
95#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
96#include <web/http/tls/un/Client.h>
97#endif
98#endif
99
100#include <list>
101#include <nlohmann/json.hpp>
102#include <string>
103
104#endif
105
107
108static void addBridgeBrokerConnection(const mqtt::bridge::lib::Broker& broker, core::socket::stream::SocketConnection* socketConnection) {
109 const std::string& bridgeName(broker.getBridge().getName());
110
111 VLOG(1) << "Add bridge broker: " << socketConnection->getInstanceName();
112
113 bridges[bridgeName][socketConnection->getInstanceName()] = socketConnection;
114}
115
116static void delBridgeBrokerConnection(const mqtt::bridge::lib::Broker& broker, core::socket::stream::SocketConnection* socketConnection) {
117 const std::string& bridgeName(broker.getBridge().getName());
118
119 VLOG(1) << "Del bridge broker: " << socketConnection->getInstanceName();
120
121 if (bridges.contains(bridgeName) && bridges[bridgeName].contains(socketConnection->getInstanceName())) {
122 bridges[bridgeName].erase(socketConnection->getInstanceName());
123 if (bridges[bridgeName].empty()) {
124 bridges.erase(bridgeName);
125 }
126 }
127}
128
129[[maybe_unused]] static void closeBridgeBrokerConnection(const mqtt::bridge::lib::Broker& broker, const std::string& instanceName) {
130 const std::string& bridgeName(broker.getBridge().getName());
131
132 VLOG(1) << "Force close bridge broker: " << instanceName;
133
134 if (bridges.contains(bridgeName) && bridges[bridgeName].contains(instanceName)) {
135 bridges[bridgeName][instanceName]->getSocketContext()->shutdownWrite();
136 }
137}
138
139static void
140reportState(const std::string& instanceName, const core::socket::SocketAddress& socketAddress, const core::socket::State& state) {
141 switch (state) {
142 case core::socket::State::OK:
143 VLOG(1) << instanceName << ": connected to '" << socketAddress.toString() << "'";
144 break;
145 case core::socket::State::DISABLED:
146 VLOG(1) << instanceName << ": disabled";
147 break;
148 case core::socket::State::ERROR:
149 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
150 break;
151 case core::socket::State::FATAL:
152 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
153 break;
154 }
155}
156
157template <typename HttpClient>
158void startClient(const std::string& name,
159 const mqtt::bridge::lib::Broker& broker,
160 const std::function<void(typename HttpClient::Config&)>& configurator) {
161 using SocketAddress = typename HttpClient::SocketAddress;
162
163 HttpClient httpClient(
164 name,
165 [](const std::shared_ptr<web::http::client::MasterRequest>& req) {
166 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
167
168 req->set("Sec-WebSocket-Protocol", "mqtt");
169
170 req->upgrade(
171 "/ws",
172 "websocket",
173 [connectionName](bool success) {
174 VLOG(1) << connectionName << ": HTTP Upgrade (http -> websocket||"
175 << "mqtt" << ") start " << (success ? "success" : "failed");
176 },
177 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req,
178 [[maybe_unused]] const std::shared_ptr<web::http::client::Response>& res,
179 [[maybe_unused]] bool success) {
180 },
181 [connectionName]([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req, const std::string& message) {
182 VLOG(1) << connectionName << ": Request parse error: " << message;
183 });
184 },
185 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req) {
186 VLOG(1) << "Session ended";
187 });
188
189 configurator(httpClient.getConfig());
190
191 httpClient
192 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
193 addBridgeBrokerConnection(broker, socketConnection);
194 })
195 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
196 delBridgeBrokerConnection(broker, socketConnection);
197 })
198 .connect([name](const SocketAddress& socketAddress, const core::socket::State& state) {
199 reportState(name, socketAddress, state);
200 });
201}
202
203int main(int argc, char* argv[]) {
204#if defined(LINK_WEBSOCKET_STATIC) || defined(LINK_SUBPROTOCOL_STATIC)
205 web::websocket::client::SocketContextUpgradeFactory::link();
206#endif
207
208#ifdef LINK_SUBPROTOCOL_STATIC
209 web::websocket::client::SubProtocolFactorySelector::link("mqtt", mqttClientSubProtocolFactory);
210#endif
211
212 CLI::App* bridgeApp = utils::Config::addInstance("bridge", "Configuration for Application mqttbridge", "MQTT-Bridge");
213 utils::Config::required(bridgeApp);
214
215 std::string bridgeDefinitionFile = "<REQUIRED>";
216 bridgeApp->needs(bridgeApp->add_option("--definition", bridgeDefinitionFile, "MQTT bridge definition file (JSON format)")
217 ->capture_default_str()
218 ->group(bridgeApp->get_formatter()->get_label("Persistent Options"))
219 ->type_name("path")
220 ->configurable()
221 ->required());
222
223 core::SNodeC::init(argc, argv);
224
225 if (mqtt::bridge::lib::BridgeStore::instance().loadAndValidate(bridgeDefinitionFile)) {
226 for (const auto& [fullInstanceName, broker] : mqtt::bridge::lib::BridgeStore::instance().getBrokers()) {
227 VLOG(1) << " Creating broker instance: " << fullInstanceName;
228 VLOG(1) << " Broker prefix: " << broker.getPrefix();
229 VLOG(1) << " Broker client id: " << broker.getClientId();
230 VLOG(1) << " Broker disabled: " << broker.getDisabled();
231 VLOG(1) << " Broker address: " << broker.getAddress();
232 VLOG(1) << " Broker prefix: " << broker.getPrefix();
233 VLOG(1) << " Broker username: " << broker.getUsername();
234 VLOG(1) << " Broker password: " << broker.getPassword();
235 VLOG(1) << " Broker client-id: " << broker.getClientId();
236 VLOG(1) << " Broker clean session: " << broker.getCleanSession();
237 VLOG(1) << " Broker will-topic: " << broker.getWillTopic();
238 VLOG(1) << " Broker will-message: " << broker.getWillMessage();
239 VLOG(1) << " Broker will-qos: " << static_cast<int>(broker.getWillQoS());
240 VLOG(1) << " Broker will-retain: " << broker.getWillRetain();
241 VLOG(1) << " Broker loop prevention: " << broker.getLoopPrevention();
242 VLOG(1) << " Bridge disabled: " << broker.getBridge().getDisabled();
243 VLOG(1) << " Bridge prefix: " << broker.getBridge().getPrefix();
244 VLOG(1) << " Bridge Transport: " << broker.getTransport();
245 VLOG(1) << " Bridge Protocol: " << broker.getProtocol();
246 VLOG(1) << " Bridge Encryption: " << broker.getEncryption();
247
248 VLOG(1) << " Topics:";
249 const std::list<iot::mqtt::Topic>& topics = broker.getTopics();
250 for (const iot::mqtt::Topic& topic : topics) {
251 VLOG(1) << " " << topic.getName() << ":" << static_cast<uint16_t>(topic.getQoS());
252 }
253
254 const std::string& transport = broker.getTransport();
255 const std::string& protocol = broker.getProtocol();
256 const std::string& encryption = broker.getEncryption();
257
258 if (transport == "stream") {
259 if (protocol == "in") {
260 if (encryption == "legacy") {
261#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4)
262 net::in::stream::legacy::Client<mqtt::bridge::SocketContextFactory>(
263 fullInstanceName,
264 [&broker](auto& config) {
265 config.setRetry();
266 config.setRetryBase(1);
267 config.setReconnect();
268 config.setDisableNagleAlgorithm();
269
270 config.Remote::setHost(broker.getAddress()["host"]);
271 config.Remote::setPort(broker.getAddress()["port"]);
272
273 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
274 },
275 broker)
276 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
277 addBridgeBrokerConnection(broker, socketConnection);
278 })
279 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
280 delBridgeBrokerConnection(broker, socketConnection);
281 })
282 .connect([&broker, fullInstanceName](const auto& socketAddress, const core::socket::State& state) {
283 reportState(broker.getBridge().getName() + "+" + fullInstanceName, socketAddress, state);
284 });
285#else // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4
286 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
287 << "' not supported.";
288#endif // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4
289 } else if (encryption == "tls") {
290#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4)
291 net::in::stream::tls::Client<mqtt::bridge::SocketContextFactory>(
292 fullInstanceName,
293 [&broker](auto& config) {
294 config.setRetry();
295 config.setRetryBase(1);
296 config.setReconnect();
297 config.setDisableNagleAlgorithm();
298
299 config.Remote::setHost(broker.getAddress()["host"]);
300 config.Remote::setPort(broker.getAddress()["port"]);
301
302 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
303 },
304 broker)
305 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
306 addBridgeBrokerConnection(broker, socketConnection);
307 })
308 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
309 delBridgeBrokerConnection(broker, socketConnection);
310 })
311 .connect([fullInstanceName](const auto& socketAddress, const core::socket::State& state) {
312 reportState(fullInstanceName, socketAddress, state);
313 });
314#else // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4
315 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
316 << "' not supported.";
317#endif // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4
318 }
319 } else if (protocol == "in6") {
320 if (encryption == "legacy") {
321#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6)
322 net::in6::stream::legacy::Client<mqtt::bridge::SocketContextFactory>(
323 fullInstanceName,
324 [&broker](auto& config) {
325 config.setRetry();
326 config.setRetryBase(1);
327 config.setReconnect();
328 config.setDisableNagleAlgorithm();
329
330 config.Remote::setHost(broker.getAddress()["host"]);
331 config.Remote::setPort(broker.getAddress()["port"]);
332
333 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
334 },
335 broker)
336 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
337 addBridgeBrokerConnection(broker, socketConnection);
338 })
339 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
340 delBridgeBrokerConnection(broker, socketConnection);
341 })
342 .connect([fullInstanceName](const auto& socketAddress, const core::socket::State& state) {
343 reportState(fullInstanceName, socketAddress, state);
344 });
345#else // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6
346 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
347 << "' not supported.";
348#endif // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6
349 } else if (encryption == "tls") {
350#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6)
351 net::in6::stream::tls::Client<mqtt::bridge::SocketContextFactory>(
352 fullInstanceName,
353 [&broker](auto& config) {
354 config.setRetry();
355 config.setRetryBase(1);
356 config.setReconnect();
357 config.setDisableNagleAlgorithm();
358
359 config.Remote::setHost(broker.getAddress()["host"]);
360 config.Remote::setPort(broker.getAddress()["port"]);
361
362 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
363 },
364 broker)
365 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
366 addBridgeBrokerConnection(broker, socketConnection);
367 })
368 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
369 delBridgeBrokerConnection(broker, socketConnection);
370 })
371 .connect([fullInstanceName](const auto& socketAddress, const core::socket::State& state) {
372 reportState(fullInstanceName, socketAddress, state);
373 });
374#else // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6
375 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
376 << "' not supported.";
377#endif // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6
378 }
379 } else if (protocol == "un") {
380 if (encryption == "legacy") {
381#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX)
382 net::un::stream::legacy::Client<mqtt::bridge::SocketContextFactory>(
383 fullInstanceName,
384 [&broker](auto& config) {
385 config.setRetry();
386 config.setRetryBase(1);
387 config.setReconnect();
388
389 config.Remote::setSunPath(broker.getAddress()["host"]);
390
391 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
392 },
393 broker)
394 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
395 addBridgeBrokerConnection(broker, socketConnection);
396 })
397 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
398 delBridgeBrokerConnection(broker, socketConnection);
399 })
400 .connect([fullInstanceName](const auto& socketAddress, const core::socket::State& state) {
401 reportState(fullInstanceName, socketAddress, state);
402 });
403#else // CONFIG_MQTTSUITE_BRIDGE_UNIX
404 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
405 << "' not supported.";
406#endif // CONFIG_MQTTSUITE_BRIDGE_UNIX
407 } else if (encryption == "tls") {
408#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS)
409 net::un::stream::tls::Client<mqtt::bridge::SocketContextFactory>(
410 fullInstanceName,
411 [&broker](auto& config) {
412 config.setRetry();
413 config.setRetryBase(1);
414 config.setReconnect();
415
416 config.Remote::setSunPath(broker.getAddress()["host"]);
417
418 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
419 },
420 broker)
421 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
422 addBridgeBrokerConnection(broker, socketConnection);
423 })
424 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
425 delBridgeBrokerConnection(broker, socketConnection);
426 })
427 .connect([fullInstanceName](const auto& socketAddress, const core::socket::State& state) {
428 reportState(fullInstanceName, socketAddress, state);
429 });
430#else // CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS
431 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
432 << "' not supported.";
433#endif // CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS
434 }
435 }
436 } else if (transport == "websocket") {
437 if (protocol == "in") {
438 if (encryption == "legacy") {
439#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
440 startClient<web::http::legacy::in::Client>(fullInstanceName, broker, [&broker](auto& config) {
441 config.Remote::setPort(8080);
442
443 config.setRetry();
444 config.setRetryBase(1);
445 config.setReconnect();
446 config.setDisableNagleAlgorithm();
447
448 config.Remote::setHost(broker.getAddress()["host"]);
449 config.Remote::setPort(broker.getAddress()["port"]);
450
451 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
452 });
453#else // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4 && CONFIG_MQTTSUITE_BRIDGE_WS
454 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
455 << "' not supported.";
456#endif // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4 && CONFIG_MQTTSUITE_BRIDGE_WS
457 } else if (encryption == "tls") {
458#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
459 startClient<web::http::tls::in::Client>(fullInstanceName, broker, [&broker](auto& config) {
460 config.Remote::setPort(8088);
461
462 config.setRetry();
463 config.setRetryBase(1);
464 config.setReconnect();
465 config.setDisableNagleAlgorithm();
466
467 config.Remote::setHost(broker.getAddress()["host"]);
468 config.Remote::setPort(broker.getAddress()["port"]);
469
470 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
471 });
472#else // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4 && CONFIG_MQTTSUITE_BRIDGE_WSS
473 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
474 << "' not supported.";
475#endif // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4 && CONFIG_MQTTSUITE_BRIDGE_WSS
476 }
477 } else if (protocol == "in6") {
478 if (encryption == "legacy") {
479#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
480 startClient<web::http::legacy::in6::Client>(fullInstanceName, broker, [&broker](auto& config) {
481 config.Remote::setPort(8080);
482
483 config.setRetry();
484 config.setRetryBase(1);
485 config.setReconnect();
486 config.setDisableNagleAlgorithm();
487
488 config.Remote::setHost(broker.getAddress()["host"]);
489 config.Remote::setPort(broker.getAddress()["port"]);
490
491 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
492 });
493#else // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6 && CONFIG_MQTTSUITE_BRIDGE_WS
494 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
495 << "' not supported.";
496#endif // CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6&& CONFIG_MQTTSUITE_BRIDGE_WS
497 } else if (encryption == "tls") {
498#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
499 startClient<web::http::tls::in6::Client>(fullInstanceName, broker, [&broker](auto& config) {
500 config.Remote::setPort(8088);
501
502 config.setRetry();
503 config.setRetryBase(1);
504 config.setReconnect();
505 config.setDisableNagleAlgorithm();
506
507 config.Remote::setHost(broker.getAddress()["host"]);
508 config.Remote::setPort(broker.getAddress()["port"]);
509
510 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
511 });
512#else // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6 && CONFIG_MQTTSUITE_BRIDGE_WSS
513 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
514 << "' not supported.";
515#endif // CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6 && CONFIG_MQTTSUITE_BRIDGE_WSS
516 }
517 } else if (protocol == "un") {
518 if (encryption == "legacy") {
519#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
520 startClient<web::http::legacy::un::Client>(fullInstanceName, broker, [&broker](auto& config) {
521 config.setRetry();
522 config.setRetryBase(1);
523 config.setReconnect();
524
525 config.Remote::setSunPath(broker.getAddress()["path"]);
526
527 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
528 });
529#else // CONFIG_MQTTSUITE_BRIDGE_UNIX && CONFIG_MQTTSUITE_BRIDGE_WS
530 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
531 << "' not supported.";
532#endif // CONFIG_MQTTSUITE_BRIDGE_UNIX && CONFIG_MQTTSUITE_BRIDGE_WS
533 } else if (encryption == "tls") {
534#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
535 startClient<web::http::tls::un::Client>(fullInstanceName, broker, [&broker](auto& config) {
536 config.setRetry();
537 config.setRetryBase(1);
538 config.setReconnect();
539
540 config.Remote::setSunPath(broker.getAddress()["path"]);
541
542 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
543 });
544#else // CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS && CONFIG_MQTTSUITE_BRIDGE_WSS
545 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
546 << "' not supported.";
547#endif // CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS && CONFIG_MQTTSUITE_BRIDGE_WSS
548 }
549 }
550 } else {
551 VLOG(1) << " Transport '" << transport << "' not supported.";
552 }
553 }
554 }
555
556 return core::SNodeC::start();
557}
static BridgeStore & instance()
bool loadAndValidate(const std::string &fileName)
const std::string & getName()
Definition Bridge.cpp:61
Bridge & getBridge() const
Definition Broker.cpp:94
int main(int argc, char *argv[])
static void addBridgeBrokerConnection(const mqtt::bridge::lib::Broker &broker, core::socket::stream::SocketConnection *socketConnection)
static std::map< std::string, std::map< std::string, core::socket::stream::SocketConnection * > > bridges
static void closeBridgeBrokerConnection(const mqtt::bridge::lib::Broker &broker, const std::string &instanceName)
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
void startClient(const std::string &name, const mqtt::bridge::lib::Broker &broker, const std::function< void(typename HttpClient::Config &)> &configurator)
static void delBridgeBrokerConnection(const mqtt::bridge::lib::Broker &broker, core::socket::stream::SocketConnection *socketConnection)