203int main(
int argc,
char* argv[]) {
204#if defined(LINK_WEBSOCKET_STATIC) || defined(LINK_SUBPROTOCOL_STATIC)
205 web::websocket::client::SocketContextUpgradeFactory::link();
208#ifdef LINK_SUBPROTOCOL_STATIC
209 web::websocket::client::SubProtocolFactorySelector::link(
"mqtt", mqttClientSubProtocolFactory);
212 CLI::App* bridgeApp = utils::Config::addInstance(
"bridge",
"Configuration for Application mqttbridge",
"MQTT-Bridge");
213 utils::Config::required(bridgeApp);
215 std::string bridgeDefinitionFile =
"<REQUIRED>";
216 bridgeApp->needs(bridgeApp->add_option(
"--definition", bridgeDefinitionFile,
"MQTT bridge definition file (JSON format)")
217 ->capture_default_str()
218 ->group(bridgeApp->get_formatter()->get_label(
"Persistent Options"))
223 core::SNodeC::init(argc, argv);
226 for (
const auto& [fullInstanceName, broker] : mqtt::bridge::lib::BridgeStore::instance().getBrokers()) {
227 VLOG(1) <<
" Creating broker instance: " << fullInstanceName;
228 VLOG(1) <<
" Broker prefix: " << broker.getPrefix();
229 VLOG(1) <<
" Broker client id: " << broker.getClientId();
230 VLOG(1) <<
" Broker disabled: " << broker.getDisabled();
231 VLOG(1) <<
" Broker address: " << broker.getAddress();
232 VLOG(1) <<
" Broker prefix: " << broker.getPrefix();
233 VLOG(1) <<
" Broker username: " << broker.getUsername();
234 VLOG(1) <<
" Broker password: " << broker.getPassword();
235 VLOG(1) <<
" Broker client-id: " << broker.getClientId();
236 VLOG(1) <<
" Broker clean session: " << broker.getCleanSession();
237 VLOG(1) <<
" Broker will-topic: " << broker.getWillTopic();
238 VLOG(1) <<
" Broker will-message: " << broker.getWillMessage();
239 VLOG(1) <<
" Broker will-qos: " <<
static_cast<
int>(broker.getWillQoS());
240 VLOG(1) <<
" Broker will-retain: " << broker.getWillRetain();
241 VLOG(1) <<
" Broker loop prevention: " << broker.getLoopPrevention();
242 VLOG(1) <<
" Bridge disabled: " << broker.getBridge().getDisabled();
243 VLOG(1) <<
" Bridge prefix: " << broker.getBridge().getPrefix();
244 VLOG(1) <<
" Bridge Transport: " << broker.getTransport();
245 VLOG(1) <<
" Bridge Protocol: " << broker.getProtocol();
246 VLOG(1) <<
" Bridge Encryption: " << broker.getEncryption();
248 VLOG(1) <<
" Topics:";
249 const std::list<iot::mqtt::Topic>& topics = broker.getTopics();
250 for (
const iot::mqtt::Topic& topic : topics) {
251 VLOG(1) <<
" " << topic.getName() <<
":" <<
static_cast<uint16_t>(topic.getQoS());
254 const std::string& transport = broker.getTransport();
255 const std::string& protocol = broker.getProtocol();
256 const std::string& encryption = broker.getEncryption();
258 if (transport ==
"stream") {
259 if (protocol ==
"in") {
260 if (encryption ==
"legacy") {
261#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4)
262 net::in::stream::legacy::Client<mqtt::bridge::SocketContextFactory>(
264 [&broker](
auto& config) {
266 config.setRetryBase(1);
267 config.setReconnect();
268 config.setDisableNagleAlgorithm();
270 config.Remote::setHost(broker.getAddress()[
"host"]);
271 config.Remote::setPort(broker.getAddress()[
"port"]);
273 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
276 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
277 addBridgeBrokerConnection(broker, socketConnection);
279 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
280 delBridgeBrokerConnection(broker, socketConnection);
282 .connect([&broker, fullInstanceName](
const auto& socketAddress,
const core::socket::State& state) {
283 reportState(broker.getBridge().getName() +
"+" + fullInstanceName, socketAddress, state);
286 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
287 <<
"' not supported.";
289 }
else if (encryption ==
"tls") {
290#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4)
291 net::in::stream::tls::Client<mqtt::bridge::SocketContextFactory>(
293 [&broker](
auto& config) {
295 config.setRetryBase(1);
296 config.setReconnect();
297 config.setDisableNagleAlgorithm();
299 config.Remote::setHost(broker.getAddress()[
"host"]);
300 config.Remote::setPort(broker.getAddress()[
"port"]);
302 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
305 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
306 addBridgeBrokerConnection(broker, socketConnection);
308 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
309 delBridgeBrokerConnection(broker, socketConnection);
311 .connect([fullInstanceName](
const auto& socketAddress,
const core::socket::State& state) {
312 reportState(fullInstanceName, socketAddress, state);
315 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
316 <<
"' not supported.";
319 }
else if (protocol ==
"in6") {
320 if (encryption ==
"legacy") {
321#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6)
322 net::in6::stream::legacy::Client<mqtt::bridge::SocketContextFactory>(
324 [&broker](
auto& config) {
326 config.setRetryBase(1);
327 config.setReconnect();
328 config.setDisableNagleAlgorithm();
330 config.Remote::setHost(broker.getAddress()[
"host"]);
331 config.Remote::setPort(broker.getAddress()[
"port"]);
333 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
336 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
337 addBridgeBrokerConnection(broker, socketConnection);
339 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
340 delBridgeBrokerConnection(broker, socketConnection);
342 .connect([fullInstanceName](
const auto& socketAddress,
const core::socket::State& state) {
343 reportState(fullInstanceName, socketAddress, state);
346 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
347 <<
"' not supported.";
349 }
else if (encryption ==
"tls") {
350#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6)
351 net::in6::stream::tls::Client<mqtt::bridge::SocketContextFactory>(
353 [&broker](
auto& config) {
355 config.setRetryBase(1);
356 config.setReconnect();
357 config.setDisableNagleAlgorithm();
359 config.Remote::setHost(broker.getAddress()[
"host"]);
360 config.Remote::setPort(broker.getAddress()[
"port"]);
362 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
365 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
366 addBridgeBrokerConnection(broker, socketConnection);
368 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
369 delBridgeBrokerConnection(broker, socketConnection);
371 .connect([fullInstanceName](
const auto& socketAddress,
const core::socket::State& state) {
372 reportState(fullInstanceName, socketAddress, state);
375 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
376 <<
"' not supported.";
379 }
else if (protocol ==
"un") {
380 if (encryption ==
"legacy") {
381#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX)
382 net::un::stream::legacy::Client<mqtt::bridge::SocketContextFactory>(
384 [&broker](
auto& config) {
386 config.setRetryBase(1);
387 config.setReconnect();
389 config.Remote::setSunPath(broker.getAddress()[
"host"]);
391 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
394 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
395 addBridgeBrokerConnection(broker, socketConnection);
397 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
398 delBridgeBrokerConnection(broker, socketConnection);
400 .connect([fullInstanceName](
const auto& socketAddress,
const core::socket::State& state) {
401 reportState(fullInstanceName, socketAddress, state);
404 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
405 <<
"' not supported.";
407 }
else if (encryption ==
"tls") {
408#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS)
409 net::un::stream::tls::Client<mqtt::bridge::SocketContextFactory>(
411 [&broker](
auto& config) {
413 config.setRetryBase(1);
414 config.setReconnect();
416 config.Remote::setSunPath(broker.getAddress()[
"host"]);
418 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
421 .setOnConnected([&broker](core::socket::stream::SocketConnection* socketConnection) {
422 addBridgeBrokerConnection(broker, socketConnection);
424 .setOnDisconnect([&broker](core::socket::stream::SocketConnection* socketConnection) {
425 delBridgeBrokerConnection(broker, socketConnection);
427 .connect([fullInstanceName](
const auto& socketAddress,
const core::socket::State& state) {
428 reportState(fullInstanceName, socketAddress, state);
431 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
432 <<
"' not supported.";
436 }
else if (transport ==
"websocket") {
437 if (protocol ==
"in") {
438 if (encryption ==
"legacy") {
439#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
440 startClient<web::http::legacy::in::Client>(fullInstanceName, broker, [&broker](
auto& config) {
441 config.Remote::setPort(8080);
444 config.setRetryBase(1);
445 config.setReconnect();
446 config.setDisableNagleAlgorithm();
448 config.Remote::setHost(broker.getAddress()[
"host"]);
449 config.Remote::setPort(broker.getAddress()[
"port"]);
451 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
454 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
455 <<
"' not supported.";
457 }
else if (encryption ==
"tls") {
458#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV4) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
459 startClient<web::http::tls::in::Client>(fullInstanceName, broker, [&broker](
auto& config) {
460 config.Remote::setPort(8088);
463 config.setRetryBase(1);
464 config.setReconnect();
465 config.setDisableNagleAlgorithm();
467 config.Remote::setHost(broker.getAddress()[
"host"]);
468 config.Remote::setPort(broker.getAddress()[
"port"]);
470 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
473 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
474 <<
"' not supported.";
477 }
else if (protocol ==
"in6") {
478 if (encryption ==
"legacy") {
479#if defined(CONFIG_MQTTSUITE_BRIDGE_TCP_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
480 startClient<web::http::legacy::in6::Client>(fullInstanceName, broker, [&broker](
auto& config) {
481 config.Remote::setPort(8080);
484 config.setRetryBase(1);
485 config.setReconnect();
486 config.setDisableNagleAlgorithm();
488 config.Remote::setHost(broker.getAddress()[
"host"]);
489 config.Remote::setPort(broker.getAddress()[
"port"]);
491 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
494 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
495 <<
"' not supported.";
497 }
else if (encryption ==
"tls") {
498#if defined(CONFIG_MQTTSUITE_BRIDGE_TLS_IPV6) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
499 startClient<web::http::tls::in6::Client>(fullInstanceName, broker, [&broker](
auto& config) {
500 config.Remote::setPort(8088);
503 config.setRetryBase(1);
504 config.setReconnect();
505 config.setDisableNagleAlgorithm();
507 config.Remote::setHost(broker.getAddress()[
"host"]);
508 config.Remote::setPort(broker.getAddress()[
"port"]);
510 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
513 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
514 <<
"' not supported.";
517 }
else if (protocol ==
"un") {
518 if (encryption ==
"legacy") {
519#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX) && defined(CONFIG_MQTTSUITE_BRIDGE_WS)
520 startClient<web::http::legacy::un::Client>(fullInstanceName, broker, [&broker](
auto& config) {
522 config.setRetryBase(1);
523 config.setReconnect();
525 config.Remote::setSunPath(broker.getAddress()[
"path"]);
527 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
530 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
531 <<
"' not supported.";
533 }
else if (encryption ==
"tls") {
534#if defined(CONFIG_MQTTSUITE_BRIDGE_UNIX_TLS) && defined(CONFIG_MQTTSUITE_BRIDGE_WSS)
535 startClient<web::http::tls::un::Client>(fullInstanceName, broker, [&broker](
auto& config) {
537 config.setRetryBase(1);
538 config.setReconnect();
540 config.Remote::setSunPath(broker.getAddress()[
"path"]);
542 config.setDisabled(broker.getDisabled() || broker.getBridge().getDisabled());
545 VLOG(1) <<
" Transport '" << transport <<
"', protocol '" << protocol <<
"', encryption '" << encryption
546 <<
"' not supported.";
551 VLOG(1) <<
" Transport '" << transport <<
"' not supported.";
556 return core::SNodeC::start();