203 {
204#if defined(LINK_WEBSOCKET_STATIC) || defined(LINK_SUBPROTOCOL_STATIC)
205 web::websocket::client::SocketContextUpgradeFactory::link();
206#endif
207
208#ifdef LINK_SUBPROTOCOL_STATIC
210#endif
211
212 CLI::App* bridgeApp = utils::Config::addInstance("bridge", "Configuration for Application mqttbridge", "MQTT-Bridge");
213 utils::Config::required(bridgeApp);
214
215 std::string bridgeDefinitionFile = "<REQUIRED>";
216 bridgeApp->needs(bridgeApp->add_option("--definition", bridgeDefinitionFile, "MQTT bridge definition file (JSON format)")
217 ->capture_default_str()
218 ->group(bridgeApp->get_formatter()->get_label("Persistent Options"))
219 ->type_name("path")
220 ->configurable()
221 ->required());
222
223 core::SNodeC::init(argc, argv);
224
227 VLOG(1) << " Creating broker instance: " << fullInstanceName;
228 VLOG(1) << " Broker prefix: " << broker.getPrefix();
229 VLOG(1) << " Broker client id: " << broker.getClientId();
230 VLOG(1) << " Broker disabled: " << broker.getDisabled();
231 VLOG(1) << " Broker address: " << broker.getAddress();
232 VLOG(1) << " Broker prefix: " << broker.getPrefix();
233 VLOG(1) << " Broker username: " << broker.getUsername();
234 VLOG(1) << " Broker password: " << broker.getPassword();
235 VLOG(1) << " Broker client-id: " << broker.getClientId();
236 VLOG(1) << " Broker clean session: " << broker.getCleanSession();
237 VLOG(1) << " Broker will-topic: " << broker.getWillTopic();
238 VLOG(1) << " Broker will-message: " << broker.getWillMessage();
239 VLOG(1) << " Broker will-qos: " << static_cast<int>(broker.getWillQoS());
240 VLOG(1) << " Broker will-retain: " << broker.getWillRetain();
241 VLOG(1) << " Broker loop prevention: " << broker.getLoopPrevention();
242 VLOG(1) << " Bridge disabled: " << broker.getBridge().getDisabled();
243 VLOG(1) << " Bridge prefix: " << broker.getBridge().getPrefix();
244 VLOG(1) << " Bridge Transport: " << broker.getTransport();
245 VLOG(1) << " Bridge Protocol: " << broker.getProtocol();
246 VLOG(1) << " Bridge Encryption: " << broker.getEncryption();
247
248 VLOG(1) << " Topics:";
249 const std::list<iot::mqtt::Topic>& topics = broker.getTopics();
250 for (const iot::mqtt::Topic& topic : topics) {
251 VLOG(1) << " " << topic.getName() << ":" << static_cast<uint16_t>(topic.getQoS());
252 }
253
254 const std::string& transport = broker.getTransport();
255 const std::string& protocol = broker.getProtocol();
256 const std::string& encryption = broker.getEncryption();
257
258 if (transport == "stream") {
259 if (protocol == "in") {
260 if (encryption == "legacy") {
261#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4)
262 net::in::stream::legacy::Client<mqtt::bridge::SocketContextFactory>(
263 fullInstanceName,
264 [&broker](auto& config) {
265 config.setRetry();
266 config.setRetryBase(1);
267 config.setReconnect();
268 config.setDisableNagleAlgorithm();
269
270 config.Remote::setHost(broker.getAddress()["host"]);
271 config.Remote::setPort(broker.getAddress()["port"]);
272
273 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
274 },
275 broker)
276 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
278 })
279 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
281 })
282 .connect([&broker, fullInstanceName](const auto& socketAddress, const core::socket::State& state) {
283 reportState(broker.getBridge().getName() +
"+" + fullInstanceName, socketAddress, state);
284 });
285#else
286 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
287 << "' not supported.";
288#endif
289 } else if (encryption == "tls") {
290#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4)
291 net::in::stream::tls::Client<mqtt::bridge::SocketContextFactory>(
292 fullInstanceName,
293 [&broker](auto& config) {
294 config.setRetry();
295 config.setRetryBase(1);
296 config.setReconnect();
297 config.setDisableNagleAlgorithm();
298
299 config.Remote::setHost(broker.getAddress()["host"]);
300 config.Remote::setPort(broker.getAddress()["port"]);
301
302 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
303 },
304 broker)
305 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
307 })
308 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
310 })
311 .connect([fullInstanceName](const auto& socketAddress, const core::socket::State& state) {
312 reportState(fullInstanceName, socketAddress, state);
313 });
314#else
315 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
316 << "' not supported.";
317#endif
318 }
319 } else if (protocol == "in6") {
320 if (encryption == "legacy") {
321#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6)
322 net::in6::stream::legacy::Client<mqtt::bridge::SocketContextFactory>(
323 fullInstanceName,
324 [&broker](auto& config) {
325 config.setRetry();
326 config.setRetryBase(1);
327 config.setReconnect();
328 config.setDisableNagleAlgorithm();
329
330 config.Remote::setHost(broker.getAddress()["host"]);
331 config.Remote::setPort(broker.getAddress()["port"]);
332
333 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
334 },
335 broker)
336 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
338 })
339 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
341 })
342 .connect([fullInstanceName](const auto& socketAddress, const core::socket::State& state) {
343 reportState(fullInstanceName, socketAddress, state);
344 });
345#else
346 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
347 << "' not supported.";
348#endif
349 } else if (encryption == "tls") {
350#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6)
351 net::in6::stream::tls::Client<mqtt::bridge::SocketContextFactory>(
352 fullInstanceName,
353 [&broker](auto& config) {
354 config.setRetry();
355 config.setRetryBase(1);
356 config.setReconnect();
357 config.setDisableNagleAlgorithm();
358
359 config.Remote::setHost(broker.getAddress()["host"]);
360 config.Remote::setPort(broker.getAddress()["port"]);
361
362 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
363 },
364 broker)
365 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
367 })
368 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
370 })
371 .connect([fullInstanceName](const auto& socketAddress, const core::socket::State& state) {
372 reportState(fullInstanceName, socketAddress, state);
373 });
374#else
375 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
376 << "' not supported.";
377#endif
378 }
379 } else if (protocol == "un") {
380 if (encryption == "legacy") {
381#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX)
382 net::un::stream::legacy::Client<mqtt::bridge::SocketContextFactory>(
383 fullInstanceName,
384 [&broker](auto& config) {
385 config.setRetry();
386 config.setRetryBase(1);
387 config.setReconnect();
388
389 config.Remote::setSunPath(broker.getAddress()["host"]);
390
391 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
392 },
393 broker)
394 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
396 })
397 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
399 })
400 .connect([fullInstanceName](const auto& socketAddress, const core::socket::State& state) {
401 reportState(fullInstanceName, socketAddress, state);
402 });
403#else
404 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
405 << "' not supported.";
406#endif
407 } else if (encryption == "tls") {
408#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS)
409 net::un::stream::tls::Client<mqtt::bridge::SocketContextFactory>(
410 fullInstanceName,
411 [&broker](auto& config) {
412 config.setRetry();
413 config.setRetryBase(1);
414 config.setReconnect();
415
416 config.Remote::setSunPath(broker.getAddress()["host"]);
417
418 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
419 },
420 broker)
421 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
423 })
424 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
426 })
427 .connect([fullInstanceName](const auto& socketAddress, const core::socket::State& state) {
428 reportState(fullInstanceName, socketAddress, state);
429 });
430#else
431 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
432 << "' not supported.";
433#endif
434 }
435 }
436 } else if (transport == "websocket") {
437 if (protocol == "in") {
438 if (encryption == "legacy") {
439#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
441 config.Remote::setPort(8080);
442
443 config.setRetry();
444 config.setRetryBase(1);
445 config.setReconnect();
446 config.setDisableNagleAlgorithm();
447
448 config.Remote::setHost(broker.getAddress()["host"]);
449 config.Remote::setPort(broker.getAddress()["port"]);
450
451 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
452 });
453#else
454 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
455 << "' not supported.";
456#endif
457 } else if (encryption == "tls") {
458#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
460 config.Remote::setPort(8088);
461
462 config.setRetry();
463 config.setRetryBase(1);
464 config.setReconnect();
465 config.setDisableNagleAlgorithm();
466
467 config.Remote::setHost(broker.getAddress()["host"]);
468 config.Remote::setPort(broker.getAddress()["port"]);
469
470 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
471 });
472#else
473 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
474 << "' not supported.";
475#endif
476 }
477 } else if (protocol == "in6") {
478 if (encryption == "legacy") {
479#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
481 config.Remote::setPort(8080);
482
483 config.setRetry();
484 config.setRetryBase(1);
485 config.setReconnect();
486 config.setDisableNagleAlgorithm();
487
488 config.Remote::setHost(broker.getAddress()["host"]);
489 config.Remote::setPort(broker.getAddress()["port"]);
490
491 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
492 });
493#else
494 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
495 << "' not supported.";
496#endif
497 } else if (encryption == "tls") {
498#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
500 config.Remote::setPort(8088);
501
502 config.setRetry();
503 config.setRetryBase(1);
504 config.setReconnect();
505 config.setDisableNagleAlgorithm();
506
507 config.Remote::setHost(broker.getAddress()["host"]);
508 config.Remote::setPort(broker.getAddress()["port"]);
509
510 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
511 });
512#else
513 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
514 << "' not supported.";
515#endif
516 }
517 } else if (protocol == "un") {
518 if (encryption == "legacy") {
519#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
521 config.setRetry();
522 config.setRetryBase(1);
523 config.setReconnect();
524
525 config.Remote::setSunPath(broker.getAddress()["path"]);
526
527 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
528 });
529#else
530 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
531 << "' not supported.";
532#endif
533 } else if (encryption == "tls") {
534#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
536 config.setRetry();
537 config.setRetryBase(1);
538 config.setReconnect();
539
540 config.Remote::setSunPath(broker.getAddress()["path"]);
541
542 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
543 });
544#else
545 VLOG(1) << " Transport '" << transport << "', protocol '" << protocol << "', encryption '" << encryption
546 << "' not supported.";
547#endif
548 }
549 }
550 } else {
551 VLOG(1) << " Transport '" << transport << "' not supported.";
552 }
553 }
554 }
555
556 return core::SNodeC::start();
557}
static BridgeStore & instance()
mqtt::mqttbridge::websocket::SubProtocolFactory * mqttClientSubProtocolFactory()
static void addBridgeBrokerConnection(const mqtt::bridge::lib::Broker &broker, core::socket::stream::SocketConnection *socketConnection)
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
void startClient(const std::string &name, const mqtt::bridge::lib::Broker &broker, const std::function< void(typename HttpClient::Config &)> &configurator)
static void delBridgeBrokerConnection(const mqtt::bridge::lib::Broker &broker, core::socket::stream::SocketConnection *socketConnection)