MQTTSuite
Loading...
Searching...
No Matches
mqttbroker.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the Free
8 * Software Foundation, either version 3 of the License, or (at your option)
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "SocketContextFactory.h" // IWYU pragma: keep
43#include "config.h"
44#include "lib/Mqtt.h"
45#include "lib/MqttModel.h"
46
47#ifdef __GNUC__
48#pragma GCC diagnostic push
49#ifdef __has_warning
50#if __has_warning("-Wcovered-switch-default")
51#pragma GCC diagnostic ignored "-Wcovered-switch-default"
52#endif
53#if __has_warning("-Wnrvo")
54#pragma GCC diagnostic ignored "-Wnrvo"
55#endif
56#endif
57#endif
58#include "lib/inja.hpp"
59#ifdef __GNUC_
60#pragma GCC diagnostic pop
61#endif
62
63#ifndef DOXYGEN_SHOULD_SKIP_THIS
64
65#include <core/SNodeC.h>
66#include <iot/mqtt/MqttContext.h>
67#include <utils/Config.h>
68//
69
70#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
71#include <express/legacy/in/Server.h>
72#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
73#include <express/tls/in/Server.h>
74#endif
75#endif
76
77#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
78#include <express/legacy/in6/Server.h>
79#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
80#include <express/tls/in6/Server.h>
81#endif
82#endif
83
84#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
85#include <express/legacy/un/Server.h>
86#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
87#include <express/tls/un/Server.h>
88#endif
89#endif
90
91//
92#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
93#include <net/in/stream/legacy/SocketServer.h>
94#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
95#include <net/in/stream/tls/SocketServer.h>
96#endif
97#endif
98
99#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
100#include <net/in6/stream/legacy/SocketServer.h>
101#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
102#include <net/in6/stream/tls/SocketServer.h>
103#endif
104#endif
105
106#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
107#include <net/un/stream/legacy/SocketServer.h>
108#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
109#include <net/un/stream/tls/SocketServer.h>
110#endif
111#endif
112
113#include <web/http/http_utils.h>
114//
115#include <express/middleware/JsonMiddleware.h>
116#include <iot/mqtt/server/broker/Broker.h>
117//
118#include <log/Logger.h>
119//
120#include <nlohmann/json.hpp>
121// IWYU pragma: no_include <nlohmann/json_fwd.hpp>
122// IWYU pragma: no_include <nlohmann/detail/json_ref.hpp>
123//
124#include <cctype>
125#include <cstdlib>
126#include <iomanip>
127#include <iterator>
128#include <list>
129#include <sstream>
130#include <string>
131#include <utility>
132
133#endif
134
135static void upgrade APPLICATION(req, res) {
136 const std::string connectionName = res->getSocketContext()->getSocketConnection()->getConnectionName();
137
138 VLOG(2) << connectionName << " HTTP: Upgrade request:\n"
139 << httputils::toString(req->method,
140 req->url,
141 "HTTP/" + std::to_string(req->httpMajor) + "." + std::to_string(req->httpMinor),
142 req->queries,
143 req->headers,
144 req->trailer,
145 req->cookies,
146 std::vector<char>());
147
148 if (req->get("sec-websocket-protocol").find("mqtt") != std::string::npos) {
149 res->upgrade(req, [req, res, connectionName](const std::string& name) {
150 if (!name.empty()) {
151 VLOG(1) << connectionName << ": Successful upgrade:";
152 VLOG(1) << connectionName << ": Selected: " << name;
153 VLOG(1) << connectionName << ": Requested: " << req->get("sec-websocket-protocol");
154
155 res->end();
156 } else {
157 VLOG(1) << connectionName << ": Can not upgrade to any of '" << req->get("upgrade") << "'";
158
159 res->sendStatus(404);
160 }
161 });
162 } else {
163 VLOG(1) << connectionName << ": Unsupported subprotocol(s):";
164 VLOG(1) << " Expected: mqtt";
165 VLOG(1) << " Requested: " << req->get("sec-websocket-protocol");
166
167 res->sendStatus(404);
168 }
169}
170
171static std::string href(const std::string& text, const std::string& link) {
172 return "<a href=\"" + link + "\" style=\"color:inherit;\">" + text + "</a>";
173}
174
175static std::string href(const std::string& text, const std::string& url, const std::string& windowId, uint16_t width, uint16_t height) {
176 return "<a href=\"#\" onClick=\""
177 "let key = '" +
178 windowId +
179 "'; "
180 "if (!localStorage.getItem(key)) "
181 " localStorage.setItem(key, key + '-' + Math.random().toString(36).substr(2, 6)); "
182 "let uniqueId = localStorage.getItem(key); "
183 "if (!window._openWindows) window._openWindows = {}; "
184 "if (!window._openWindows[uniqueId] || window._openWindows[uniqueId].closed) { "
185 " window._openWindows[uniqueId] = window.open('" +
186 url + "', uniqueId, 'width=" + std::to_string(width) + ", height=" + std::to_string(height) +
187 ",location=no, menubar=no, status=no, toolbar=no'); "
188 "} else { "
189 " window._openWindows[uniqueId].focus(); "
190 "} return false;\" "
191 "style=\"color:inherit;\">" +
192 text + "</a>";
193}
194
195static std::string getOverviewPage(inja::Environment environment,
196 std::shared_ptr<iot::mqtt::server::broker::Broker> broker,
197 mqtt::mqttbroker::lib::MqttModel& mqttModel) {
198 inja::json json;
199
200 json["table-id"] = "overview";
201 json["title"] = "MQTTBroker | Active Clients";
202 json["voc"] = href("Volker Christian", "https://github.com/VolkerChristian/");
203 json["broker"] = href("MQTTBroker", "https://github.com/SNodeC/mqttsuite/tree/master/mqttbroker");
204 json["suite"] = href("MQTTSuite", "https://github.com/SNodeC/mqttsuite");
205 json["snodec"] = "Powered by " + href("SNode.C", "https://github.com/SNodeC/snode.c");
206 json["since"] = mqttModel.onlineSince();
207 json["duration"] = mqttModel.onlineDuration();
208 json["header_row"] = {"Client ID", "Online Since", "Duration", "Connection", "Local Address", "Remote Address", "Action"};
209 json["data_rows"] = inja::json::array();
210 json["subscribed_topics"] = inja::json::array();
211 json["retained_topics"] = inja::json::array();
212
213 const std::map<std::string, std::list<std::pair<std::string, uint8_t>>>& subscribedTopics = broker->getSubscriptionTree();
214
215 inja::json& subscribedTopicsJson = json["subscribed_topics"];
216 for (const auto& [topic, clients] : subscribedTopics) {
217 inja::json topicJson;
218
219 topicJson["key"] = topic;
220 for (const auto& client : clients) {
221 std::ostringstream windowId("window");
222 for (char ch : client.first) {
223 if (std::isalnum(static_cast<unsigned char>(ch))) {
224 windowId << ch;
225 } else {
226 windowId << std::hex << std::uppercase << std::setw(2) << std::setfill('0')
227 << static_cast<int>(static_cast<unsigned char>(ch));
228 }
229 }
230
231 topicJson["values"].push_back({{"client_id", client.first},
232 {"link", href(client.first, "/client?" + client.first, windowId.str(), 450, 900)},
233 {"topic", topic},
234 {"qos", std::to_string(static_cast<int>(client.second))}});
235 }
236
237 subscribedTopicsJson.push_back(topicJson);
238 }
239
240 std::list<std::pair<std::string, std::string>> retainTree = broker->getRetainTree();
241
242 inja::json& retainedTopicsJson = json["retained_topics"];
243 for (const auto& [topic, message] : retainTree) {
244 inja::json topicJson;
245
246 topicJson["key"] = topic;
247 topicJson["values"].push_back(message);
248
249 retainedTopicsJson.push_back(topicJson);
250 }
251
252 return environment.render_file("OverviewPage.html", json);
253}
254
255static std::string getDetailedPage(inja::Environment environment, const mqtt::mqttbroker::lib::Mqtt* mqtt) {
256 return environment.render_file("DetailPage.html",
257 {{"table-id", "detail"},
258 {"client_id", mqtt->getClientId()},
259 {"title", mqtt->getClientId()},
260 {"header_row", {"Attribute", "Value"}},
261 {"data_rows",
262 inja::json::array({{"Client ID", mqtt->getClientId()},
263 {"Connection", mqtt->getConnectionName()},
264 {"Clean Session", mqtt->getCleanSession() ? "true" : "false"},
265 {"Connect Flags", std::to_string(mqtt->getConnectFlags())},
266 {"Username", mqtt->getUsername()},
267 {"Username Flag", mqtt->getUsernameFlag() ? "true" : "false"},
268 {"Password", mqtt->getPassword()},
269 {"Password Flag", mqtt->getPasswordFlag() ? "true" : "false"},
270 {"Keep Alive", std::to_string(mqtt->getKeepAlive())},
271 {"Protocol", mqtt->getProtocol()},
272 {"Protocol Level", std::to_string(mqtt->getLevel())},
273 {"Loop Prevention", !mqtt->getReflect() ? "true" : "false"},
274 {"Will Message", mqtt->getWillMessage()},
275 {"Will Topic", mqtt->getWillTopic()},
276 {"Will QoS", std::to_string(mqtt->getWillQoS())},
277 {"Will Flag", mqtt->getWillFlag() ? "true" : "false"},
278 {"Will Retain", mqtt->getWillRetain() ? "true" : "false"}})},
279 {"client_id", mqtt->getClientId()},
280 {"topics", mqtt->getSubscriptions()}});
281}
282
283static std::string getRedirectSpinnerPage(inja::Environment environment, const std::string& location) {
284 return environment.render_file("Spinner.html", {{"location", location}});
285}
286
287static std::string urlDecode(const std::string& encoded) {
288 std::string decoded;
289 size_t i = 0;
290
291 while (i < encoded.length()) {
292 char ch = encoded[i];
293 if (ch == '%') {
294 // Make sure there are at least two characters after '%'
295 if (i + 2 < encoded.length() && std::isxdigit(encoded[i + 1]) && std::isxdigit(encoded[i + 2])) {
296 // Convert the two hex digits to a character
297 std::string hexValue = encoded.substr(i + 1, 2);
298 char decodedChar = static_cast<char>(std::stoi(hexValue, nullptr, 16));
299 decoded.push_back(decodedChar);
300 i += 3; // Skip over the % and the two hex digits
301 } else {
302 // Malformed encoding, just add the '%' as is.
303 decoded.push_back(ch);
304 ++i;
305 }
306 } else if (ch == '+') {
307 // Convert '+' to space (common in URL encoding)
308 decoded.push_back(' ');
309 ++i;
310 } else {
311 // Regular character, just append it.
312 decoded.push_back(ch);
313 ++i;
314 }
315 }
316
317 return decoded;
318}
319
320static express::Router getRouter(const inja::Environment& environment, std::shared_ptr<iot::mqtt::server::broker::Broker> broker) {
321 const express::Router& jsonRouter = express::middleware::JsonMiddleware();
322
323 jsonRouter.post("/disconnect", [] APPLICATION(req, res) { // cppcheck-suppress unknownMacro
324 VLOG(1) << "POST /disconnect";
325
326 req->getAttribute<nlohmann::json>(
327 [&res](nlohmann::json& json) {
328 std::string jsonString = json.dump(4);
329
330 VLOG(1) << jsonString;
331
332 std::string clientId = json["client_id"].get<std::string>();
333 const mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
334
335 if (mqtt != nullptr) {
336 mqtt->getMqttContext()->getSocketConnection()->close();
337 res->send(jsonString);
338 } else {
339 res->status(404).send("MQTT client has never existed or already gone away: '" + clientId + "'");
340 }
341 },
342 [&res](const std::string& key) {
343 VLOG(1) << "Attribute type not found: " << key;
344
345 res->status(400).send("Attribute type not found: " + key);
346 });
347 });
348
349 jsonRouter.post("/unsubscribe", [] APPLICATION(req, res) {
350 VLOG(1) << "POST /unsubscribe";
351
352 req->getAttribute<nlohmann::json>(
353 [&res](nlohmann::json& json) {
354 std::string jsonString = json.dump(4);
355
356 VLOG(1) << jsonString;
357
358 std::string clientId = json["client_id"].get<std::string>();
359 std::string topic = json["topic"].get<std::string>();
360
361 const mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
362
363 if (mqtt != nullptr) {
364 mqtt->unsubscribe(topic);
365 res->send(jsonString);
366 } else {
367 res->status(404).send("MQTT client has never existed or already gone away: '" + clientId + "'");
368 }
369 },
370 [&res](const std::string& key) {
371 VLOG(1) << "Attribute type not found: " << key;
372
373 res->status(400).send("Attribute type not found: " + key);
374 });
375 });
376
377 jsonRouter.post("/release", [broker] APPLICATION(req, res) {
378 VLOG(1) << "POST /release";
379
380 req->getAttribute<nlohmann::json>(
381 [&res, broker](nlohmann::json& json) {
382 std::string jsonString = json.dump(4);
383
384 VLOG(1) << jsonString;
385
386 std::string topic = json["topic"].get<std::string>();
387
388 broker->release(topic);
389
390 res->send(jsonString);
391 },
392 [&res](const std::string& key) {
393 VLOG(1) << "Attribute type not found: " << key;
394
395 res->status(400).send("Attribute type not found: " + key);
396 });
397 });
398
399 jsonRouter.post("/subscribe", [] APPLICATION(req, res) {
400 VLOG(1) << "POST /subscribe";
401
402 req->getAttribute<nlohmann::json>(
403 [&res](nlohmann::json& json) {
404 std::string jsonString = json.dump(4);
405
406 VLOG(1) << jsonString;
407
408 std::string clientId = json["client_id"].get<std::string>();
409 std::string topic = json["topic"].get<std::string>();
410 uint8_t qoS = json["qos"].get<uint8_t>();
411
412 const mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
413
414 if (mqtt != nullptr) {
415 mqtt->subscribe(topic, qoS);
416 res->send(jsonString);
417 } else {
418 res->status(404).send("MQTT client has never existed or already gone away: '" + clientId + "'");
419 }
420 },
421 [&res](const std::string& key) {
422 VLOG(1) << "Attribute type not found: " << key;
423
424 res->status(400).send("Attribute type not found: " + key);
425 });
426 });
427 const express::Router router;
428
429 router.use(jsonRouter);
430
431 router.get("/clients", [environment, broker] APPLICATION(req, res) {
432 std::string responseString;
433 int responseStatus = 200;
434
435 try {
436 responseString = getOverviewPage(environment, broker, mqtt::mqttbroker::lib::MqttModel::instance());
437 } catch (const inja::InjaError& error) {
438 responseStatus = 500;
439 responseString = "Internal Server Error\n";
440 responseString += std::string(error.what()) + " " + error.type + " " + error.message + " " +
441 std::to_string(error.location.line) + ":" + std::to_string(error.location.column);
442 }
443
444 res->status(responseStatus).send(responseString);
445 });
446
447 router.get("/client", [environment] APPLICATION(req, res) {
448 std::string responseString;
449 int responseStatus = 200;
450
451 if (req->queries.size() == 1) {
452 const mqtt::mqttbroker::lib::Mqtt* mqtt =
453 mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(urlDecode(req->queries.begin()->first));
454
455 if (mqtt != nullptr) {
456 VLOG(1) << "Subscriptions for client " << mqtt->getClientId();
457 for (const std::string& subscription : mqtt->getSubscriptions()) {
458 VLOG(1) << " " << subscription;
459 }
460
461 try {
462 responseString = getDetailedPage(environment, mqtt);
463 } catch (const inja::InjaError& error) {
464 responseStatus = 500;
465 responseString = "Internal Server Error\n";
466 responseString += std::string(error.what()) + " " + error.type + " " + error.message + " " +
467 std::to_string(error.location.line) + ":" + std::to_string(error.location.column);
468 }
469 } else {
470 responseStatus = 404;
471 responseString = "Not Found: " + urlDecode(req->queries.begin()->first);
472 }
473 } else {
474 responseStatus = 400;
475 responseString = "Bad Request: No Client requested";
476 }
477
478 res->status(responseStatus).send(responseString);
479 });
480
481 router.get("/spinner", [environment] APPLICATION(req, res) {
482 std::string responseString;
483 int responseStatus = 200;
484
485 if (req->queries.size() == 1) {
486 responseString = getRedirectSpinnerPage(environment, req->queries.begin()->first);
487 } else {
488 responseStatus = 400;
489 responseString = "Bad Request: No Client requested";
490 }
491 res->status(responseStatus).send(responseString);
492 });
493
494 router.get("/ws", [] APPLICATION(req, res) {
495 if (req->headers.contains("upgrade")) {
496 upgrade(req, res);
497 } else {
498 res->redirect("/spinner?/clients");
499 }
500 });
501
502 router.get("/mqtt", [] APPLICATION(req, res) {
503 if (req->headers.contains("upgrade")) {
504 upgrade(req, res);
505 } else {
506 res->redirect("/spinner?/clients");
507 }
508 });
509
510 router.get("/", [] APPLICATION(req, res) {
511 if (req->headers.contains("upgrade")) {
512 upgrade(req, res);
513 } else {
514 res->redirect("/spinner?/clients");
515 }
516 });
517
518 router.get("/sse", [] APPLICATION(req, res) {
519 if (web::http::ciContains(req->get("Accept"), "text/event-stream")) {
520 res->set("Content-Type", "text/event-stream") //
521 .set("Cache-Control", "no-cache")
522 .set("Connection", "keep-alive");
523 res->sendHeader();
524
525 mqtt::mqttbroker::lib::MqttModel::instance().addEventReceiver(res, req->get("Last-Event-ID"));
526 } else {
527 res->redirect("/spinner?/clients");
528 }
529 });
530
531 return router;
532}
533
534static void
535reportState(const std::string& instanceName, const core::socket::SocketAddress& socketAddress, const core::socket::State& state) {
536 switch (state) {
537 case core::socket::State::OK:
538 VLOG(1) << instanceName << ": listening on '" << socketAddress.toString() << "'";
539 break;
540 case core::socket::State::DISABLED:
541 VLOG(1) << instanceName << ": disabled";
542 break;
543 case core::socket::State::ERROR:
544 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
545 break;
546 case core::socket::State::FATAL:
547 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
548 break;
549 }
550}
551
552int main(int argc, char* argv[]) {
553 utils::Config::addStringOption("--mqtt-mapping-file", "MQTT mapping file (json format) for integration", "[path]", "");
554 utils::Config::addStringOption("--mqtt-session-store", "Path to file for the persistent session store", "[path]", "");
555 utils::Config::addStringOption(
556 "--html-dir", "Path to html source directory", "[path]", std::string(CMAKE_INSTALL_PREFIX) + "/var/www/mqttsuite/mqttbroker");
557
558 core::SNodeC::init(argc, argv);
559
560 std::shared_ptr<iot::mqtt::server::broker::Broker> broker =
561 iot::mqtt::server::broker::Broker::instance(SUBSCRIPTION_MAX_QOS, utils::Config::getStringOptionValue("--mqtt-session-store"));
562
563#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
564 net::in::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
565 "in-mqtt",
566 [](auto& config) {
567 config.setPort(1883);
568 config.setRetry();
569 config.setDisableNagleAlgorithm();
570 },
571 broker)
572 .listen([](const auto& socketAddress, core::socket::State state) {
573 reportState("in-mqtt", socketAddress, state);
574 });
575
576#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
577 net::in::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
578 "in-mqtts",
579 [](auto& config) {
580 config.setPort(8883);
581 config.setRetry();
582 config.setDisableNagleAlgorithm();
583 },
584 broker)
585 .listen([](const auto& socketAddress, core::socket::State state) {
586 reportState("in-mqtts", socketAddress, state);
587 });
588#endif
589#endif
590
591#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
592 net::in6::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
593 "in6-mqtt",
594 [](auto& config) {
595 config.setPort(1883);
596 config.setRetry();
597 config.setDisableNagleAlgorithm();
598
599 config.setIPv6Only();
600 },
601 broker)
602 .listen([](const auto& socketAddress, core::socket::State state) {
603 reportState("in6-mqtt", socketAddress, state);
604 });
605
606#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
607 net::in6::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
608 "in6-mqtts",
609 [](auto& config) {
610 config.setPort(8883);
611 config.setRetry();
612 config.setDisableNagleAlgorithm();
613
614 config.setIPv6Only();
615 },
616 broker)
617 .listen([](const auto& socketAddress, core::socket::State state) {
618 reportState("in6-mqtts", socketAddress, state);
619 });
620#endif
621#endif
622
623#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
624 net::un::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
625 "un-mqtt",
626 [](auto& config) {
627 config.setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config.getInstanceName());
628 config.setRetry();
629 },
630 broker)
631 .listen([](const auto& socketAddress, core::socket::State state) {
632 reportState("un-mqtt", socketAddress, state);
633 });
634
635#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
636 net::un::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
637 "un-mqtts",
638 [](auto& config) {
639 config.setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config.getInstanceName());
640 config.setRetry();
641 },
642 broker)
643 .listen([](const auto& socketAddress, core::socket::State state) {
644 reportState("un-mqtts", socketAddress, state);
645 });
646#endif
647#endif
648
649 inja::Environment environment{utils::Config::getStringOptionValue("--html-dir") + "/"};
650 express::Router router = getRouter(environment, broker);
651
652#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
653 express::legacy::in::Server("in-http", router, reportState, [](auto& config) {
654 config.setPort(8080);
655 config.setRetry();
656 config.setDisableNagleAlgorithm();
657 });
658
659#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
660 express::tls::in::Server("in-https", router, reportState, [](auto& config) {
661 config.setPort(8088);
662 config.setRetry();
663 config.setDisableNagleAlgorithm();
664 });
665#endif
666#endif
667
668#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
669 express::legacy::in6::Server("in6-http", router, reportState, [](auto& config) {
670 config.setPort(8080);
671 config.setRetry();
672 config.setDisableNagleAlgorithm();
673
674 config.setIPv6Only();
675 });
676
677#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
678 express::tls::in6::Server("in6-https", router, reportState, [](auto& config) {
679 config.setPort(8088);
680 config.setRetry();
681 config.setDisableNagleAlgorithm();
682
683 config.setIPv6Only();
684 });
685#endif
686#endif
687
688#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
689 express::legacy::un::Server("un-http", router, reportState, [](auto& config) {
690 config.setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config.getInstanceName());
691 });
692
693#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
694 express::tls::un::Server("un-https", router, reportState, [](auto& config) {
695 config.setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config.getInstanceName());
696 });
697#endif
698#endif
699
700 return core::SNodeC::start();
701}
Class for changing the configuration.
Definition inja.hpp:2851
static MqttModel & instance()
Definition MqttModel.cpp:76
void addEventReceiver(const std::shared_ptr< express::Response > &response, const std::string &lastEventId)
const Mqtt * getMqtt(const std::string &clientId)
int main(int argc, char *argv[])
static std::string getDetailedPage(inja::Environment environment, const mqtt::mqttbroker::lib::Mqtt *mqtt)
static std::string urlDecode(const std::string &encoded)
static std::string href(const std::string &text, const std::string &link)
static express::Router getRouter(const inja::Environment &environment, std::shared_ptr< iot::mqtt::server::broker::Broker > broker)
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
static std::string href(const std::string &text, const std::string &url, const std::string &windowId, uint16_t width, uint16_t height)
static std::string getOverviewPage(inja::Environment environment, std::shared_ptr< iot::mqtt::server::broker::Broker > broker, mqtt::mqttbroker::lib::MqttModel &mqttModel)
static std::string getRedirectSpinnerPage(inja::Environment environment, const std::string &location)
const std::string type
Definition inja.hpp:256
const SourceLocation location
Definition inja.hpp:259
const std::string message
Definition inja.hpp:257