MQTTSuite
Loading...
Searching...
No Matches
mqttbroker.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025, 2026
5 *
6 * This program is free software: you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the Free
8 * Software Foundation, either version 3 of the License, or (at your option)
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "SocketContextFactory.h" // IWYU pragma: keep
43#include "config.h"
44#include "lib/ConfigApplication.h"
45#include "lib/Mqtt.h"
46#include "lib/MqttModel.h"
47
48#include <core/SNodeC.h>
49#include <utils/Config.h>
50//
51#include <express/middleware/JsonMiddleware.h>
52#include <express/middleware/StaticMiddleware.h>
53#include <iot/mqtt/MqttContext.h>
54#include <iot/mqtt/server/broker/Broker.h>
55
56#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
57#include <express/legacy/in/Server.h>
58#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
59#include <express/tls/in/Server.h>
60#endif
61#endif
62
63#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
64#include <express/legacy/in6/Server.h>
65#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
66#include <express/tls/in6/Server.h>
67#endif
68#endif
69
70#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
71#include <express/legacy/un/Server.h>
72#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
73#include <express/tls/un/Server.h>
74#endif
75#endif
76
77//
78#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
79#include <net/in/stream/legacy/SocketServer.h>
80#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
81#include <net/in/stream/tls/SocketServer.h>
82#endif
83#endif
84
85#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
86#include <net/in6/stream/legacy/SocketServer.h>
87#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
88#include <net/in6/stream/tls/SocketServer.h>
89#endif
90#endif
91
92#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
93#include <net/un/stream/legacy/SocketServer.h>
94#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
95#include <net/un/stream/tls/SocketServer.h>
96#endif
97#endif
98
99#ifndef DOXYGEN_SHOULD_SKIP_THIS
100
101#include <web/http/http_utils.h>
102//
103#include <nlohmann/json.hpp>
104// IWYU pragma: no_include <nlohmann/json_fwd.hpp>
105//
106#include <log/Logger.h>
107//
108#include <utility>
109
110#endif
111
112static void upgrade APPLICATION(req, res) {
113 const std::string connectionName = res->getSocketContext()->getSocketConnection()->getConnectionName();
114
115 VLOG(2) << connectionName << " HTTP: Upgrade request:\n"
116 << httputils::toString(req->method,
117 req->url,
118 "HTTP/" + std::to_string(req->httpMajor) + "." + std::to_string(req->httpMinor),
119 req->queries,
120 req->headers,
121 req->trailer,
122 req->cookies,
123 std::vector<char>());
124
125 if (req->get("sec-websocket-protocol").find("mqtt") != std::string::npos) {
126 res->upgrade(req, [req, res, connectionName](const std::string& name) {
127 if (!name.empty()) {
128 VLOG(1) << connectionName << ": Successful upgrade:";
129 VLOG(1) << connectionName << ": Selected: " << name;
130 VLOG(1) << connectionName << ": Requested: " << req->get("sec-websocket-protocol");
131
132 res->end();
133 } else {
134 VLOG(1) << connectionName << ": Can not upgrade to any of '" << req->get("upgrade") << "'";
135
136 res->sendStatus(404);
137 }
138 });
139 } else {
140 VLOG(1) << connectionName << ": Unsupported subprotocol(s):";
141 VLOG(1) << " Expected: mqtt";
142 VLOG(1) << " Requested: " << req->get("sec-websocket-protocol");
143
144 res->sendStatus(404);
145 }
146}
147
148static express::Router getRouter(std::shared_ptr<iot::mqtt::server::broker::Broker> broker, const std::string& webRoot) {
149 const express::Router& jsonRouter = express::middleware::JsonMiddleware();
150
151 /*
152 * /api/mqtt/disconnect
153 * JSON.stringify({ clientId })
154 */
155 jsonRouter.use("/api/mqtt", [] MIDDLEWARE(req, res, next) { // cppcheck-suppress unknownMacro
156 res->set({{"Access-Control-Allow-Origin", "*"},
157 {"Access-Control-Allow-Headers", "Content-Type"},
158 {"Access-Control-Allow-Methods", "GET, OPTIONS, POST"},
159 {"Access-Control-Allow-Private-Network", "true"}});
160 next();
161 });
162
163 jsonRouter.post("/api/mqtt/disconnect", [] APPLICATION(req, res) {
164 VLOG(1) << "POST /disconnect";
165
166 req->getAttribute<nlohmann::json>(
167 [&res](nlohmann::json& json) {
168 std::string jsonString = json.dump(4);
169
170 VLOG(1) << jsonString;
171
172 std::string clientId = json["clientId"].get<std::string>();
173 const mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
174
175 if (mqtt != nullptr) {
176 mqtt->getMqttContext()->getSocketConnection()->close();
177 res->send(R"({"success": true, "message": "Client disconnected successfully"})"_json.dump());
178 } else {
179 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
180 }
181 },
182 [&res](const std::string& key) {
183 VLOG(1) << "Attribute type not found: " << key;
184
185 res->status(400).send("Attribute type not found: " + key);
186 });
187 });
188
189 /*
190 * /api/mqtt/unsubscribe
191 * JSON.stringify({clientId, topic})
192 */
193 jsonRouter.post("/api/mqtt/unsubscribe", [] APPLICATION(req, res) {
194 VLOG(1) << "POST /unsubscribe";
195
196 req->getAttribute<nlohmann::json>(
197 [&res](nlohmann::json& json) {
198 std::string jsonString = json.dump(4);
199
200 VLOG(1) << jsonString;
201
202 std::string clientId = json["clientId"].get<std::string>();
203 std::string topic = json["topic"].get<std::string>();
204
205 mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
206
207 if (mqtt != nullptr) {
208 mqtt->unsubscribe(topic);
209 res->send(R"({"success": true, "message": "Client unsubscribed successfully"})"_json.dump());
210 } else {
211 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
212 }
213 },
214 [&res](const std::string& key) {
215 VLOG(1) << "Attribute type not found: " << key;
216
217 res->status(400).send("Attribute type not found: " + key);
218 });
219 });
220
221 /*
222 * /api/mqtt/release
223 * JSON.stringify({ topic })
224 */
225 jsonRouter.post("/api/mqtt/release", [broker] APPLICATION(req, res) {
226 VLOG(1) << "POST /release";
227
228 req->getAttribute<nlohmann::json>(
229 [&res, broker](nlohmann::json& json) {
230 std::string jsonString = json.dump(4);
231
232 VLOG(1) << jsonString;
233
234 std::string topic = json["topic"].get<std::string>();
235
236 broker->publish("", topic, "", 0, true);
237 mqtt::mqttbroker::lib::MqttModel::instance().publishMessage(topic, "", 0, true);
238
239 res->send(R"({"success": true, "message": "Retained message released successfully"})"_json.dump());
240 },
241 [&res](const std::string& key) {
242 VLOG(1) << "Attribute type not found: " << key;
243
244 res->status(400).send("Attribute type not found: " + key);
245 });
246 });
247
248 /*
249 * /api/mqtt/subscribe
250 * JSON.stringify({ clientId, topic, qos })
251 */
252 jsonRouter.post("/api/mqtt/subscribe", [] APPLICATION(req, res) {
253 VLOG(1) << "POST /subscribe";
254
255 req->getAttribute<nlohmann::json>(
256 [&res](nlohmann::json& json) {
257 std::string jsonString = json.dump(4);
258
259 VLOG(1) << jsonString;
260
261 std::string clientId = json["clientId"].get<std::string>();
262 std::string topic = json["topic"].get<std::string>();
263 uint8_t qoS = json["qos"].get<uint8_t>();
264
265 mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
266
267 if (mqtt != nullptr) {
268 mqtt->subscribe(topic, qoS);
269
270 res->send(R"({"success": true, "message": "Client subscribed successfully"})"_json.dump());
271 } else {
272 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
273 }
274 },
275 [&res](const std::string& key) {
276 VLOG(1) << "Attribute type not found: " << key;
277
278 res->status(400).send("Attribute type not found: " + key);
279 });
280 });
281 const express::Router router;
282
283 router.use(jsonRouter);
284
285 router.get("/api/mqtt/events", [broker] APPLICATION(req, res) {
286 if (web::http::ciContains(req->get("Accept"), "text/event-stream")) {
287 res->set({{"Content-Type", "text/event-stream"},
288 {"Cache-Control", "no-cache"},
289 {"Connection", "keep-alive"},
290 {"Access-Control-Allow-Origin", "*"}});
291
292 res->sendHeader();
293
294 mqtt::mqttbroker::lib::MqttModel::instance().addEventReceiver(res, req->get("Last-Event-ID"), broker);
295 } else {
296 res->redirect("/clients");
297 }
298 });
299
300 router.get("/ws", [] APPLICATION(req, res) {
301 if (req->headers.contains("upgrade")) {
302 upgrade(req, res);
303 } else {
304 res->redirect("/clients");
305 }
306 });
307
308 router.get("/mqtt", [] APPLICATION(req, res) {
309 if (req->headers.contains("upgrade")) {
310 upgrade(req, res);
311 } else {
312 res->redirect("/clients");
313 }
314 });
315
316 router.get("/", [] APPLICATION(req, res) {
317 if (req->headers.contains("upgrade")) {
318 upgrade(req, res);
319 } else {
320 res->redirect("/clients");
321 }
322 });
323
324 router.get("/sse", [broker] APPLICATION(req, res) {
325 if (web::http::ciContains(req->get("Accept"), "text/event-stream")) {
326 res->set("Content-Type", "text/event-stream") //
327 .set("Cache-Control", "no-cache")
328 .set("Connection", "keep-alive");
329 res->sendHeader();
330
331 mqtt::mqttbroker::lib::MqttModel::instance().addEventReceiver(res, req->get("Last-Event-ID"), broker);
332 } else {
333 res->redirect("/clients");
334 }
335 });
336
337 router.get("/clients", [] APPLICATION(req, res) {
338 res->redirect("/clients/index.html");
339 });
340
341 router.use("/clients", express::middleware::StaticMiddleware(webRoot));
342
343 router.get("*", [] APPLICATION(req, res) {
344 res->redirect("/clients/index.html");
345 });
346
347 return router;
348}
349
350static void
351reportState(const std::string& instanceName, const core::socket::SocketAddress& socketAddress, const core::socket::State& state) {
352 switch (state) {
353 case core::socket::State::OK:
354 VLOG(1) << instanceName << ": listening on '" << socketAddress.toString() << "'";
355 break;
356 case core::socket::State::DISABLED:
357 VLOG(1) << instanceName << ": disabled";
358 break;
359 case core::socket::State::ERROR:
360 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
361 break;
362 case core::socket::State::FATAL:
363 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
364 break;
365 }
366}
367
368int main(int argc, char* argv[]) {
369 utils::Config::configRoot.newSubCommand<mqtt::lib::ConfigMqttBroker>()->setHtmlRoot(std::string(CMAKE_INSTALL_PREFIX) +
370 "/var/www/mqttsuite/mqttbroker");
371
372 core::SNodeC::init(argc, argv);
373
374 std::shared_ptr<iot::mqtt::server::broker::Broker> broker = iot::mqtt::server::broker::Broker::instance(
375 SUBSCRIPTION_MAX_QOS, utils::Config::configRoot.getSubCommand<mqtt::lib::ConfigMqttBroker>()->getSessionStore());
376
377#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
378 net::in::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>( //
379 "in-mqtt",
380 [](net::in::stream::legacy::config::ConfigSocketServer* config) {
381 config->setPort(1883);
382 config->setRetry();
383 config->setDisableNagleAlgorithm();
384 },
385 broker)
386 .listen([](const auto& socketAddress, core::socket::State state) {
387 reportState("in-mqtt", socketAddress, state);
388 });
389
390#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
391 net::in::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>( //
392 "in-mqtts",
393 [](net::in::stream::tls::config::ConfigSocketServer* config) {
394 config->setPort(8883);
395 config->setRetry();
396 config->setDisableNagleAlgorithm();
397 },
398 broker)
399 .listen([](const auto& socketAddress, core::socket::State state) {
400 reportState("in-mqtts", socketAddress, state);
401 });
402#endif
403#endif
404
405#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
406 net::in6::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>( //
407 "in6-mqtt",
408 [](net::in6::stream::legacy::config::ConfigSocketServer* config) {
409 config->setPort(1883);
410 config->setRetry();
411 config->setDisableNagleAlgorithm();
412
413 config->setIPv6Only();
414 },
415 broker)
416 .listen([](const auto& socketAddress, core::socket::State state) {
417 reportState("in6-mqtt", socketAddress, state);
418 });
419
420#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
421 net::in6::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>( //
422 "in6-mqtts",
423 [](net::in6::stream::tls::config::ConfigSocketServer* config) {
424 config->setPort(8883);
425 config->setRetry();
426 config->setDisableNagleAlgorithm();
427
428 config->setIPv6Only();
429 },
430 broker)
431 .listen([](const auto& socketAddress, core::socket::State state) {
432 reportState("in6-mqtts", socketAddress, state);
433 });
434#endif
435#endif
436
437#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
438 net::un::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>( //
439 "un-mqtt",
440 [](net::un::stream::legacy::config::ConfigSocketServer* config) {
441 config->setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config->getInstanceName());
442 config->setRetry();
443 },
444 broker)
445 .listen([](const auto& socketAddress, core::socket::State state) {
446 reportState("un-mqtt", socketAddress, state);
447 });
448
449#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
450 net::un::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>( //
451 "un-mqtts",
452 [](net::un::stream::tls::config::ConfigSocketServer* config) {
453 config->setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config->getInstanceName());
454 config->setRetry();
455 },
456 broker)
457 .listen([](const auto& socketAddress, core::socket::State state) {
458 reportState("un-mqtts", socketAddress, state);
459 });
460#endif
461#endif
462 express::Router router = getRouter(broker, utils::Config::configRoot.getSubCommand<mqtt::lib::ConfigMqttBroker>()->getHtmlRoot());
463
464#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
465 express::legacy::in::Server( //
466 "in-http",
467 router,
468 reportState,
469 [](net::in::stream::legacy::config::ConfigSocketServer* config) {
470 config->setPort(8080);
471 config->setRetry();
472 config->setDisableNagleAlgorithm();
473 });
474
475#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
476 express::tls::in::Server( //
477 "in-https",
478 router,
479 reportState,
480 [](net::in::stream::tls::config::ConfigSocketServer* config) {
481 config->setPort(8088);
482 config->setRetry();
483 config->setDisableNagleAlgorithm();
484 });
485#endif
486#endif
487
488#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
489 express::legacy::in6::Server( //
490 "in6-http",
491 router,
492 reportState,
493 [](net::in6::stream::legacy::config::ConfigSocketServer* config) {
494 config->setPort(8080);
495 config->setRetry();
496 config->setDisableNagleAlgorithm();
497
498 config->setIPv6Only();
499 });
500
501#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
502 express::tls::in6::Server( //
503 "in6-https",
504 router,
505 reportState,
506 [](net::in6::stream::tls::config::ConfigSocketServer* config) {
507 config->setPort(8088);
508 config->setRetry();
509 config->setDisableNagleAlgorithm();
510
511 config->setIPv6Only();
512 });
513#endif
514#endif
515
516#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
517 express::legacy::un::Server( //
518 "un-http",
519 router,
520 reportState,
521 [](net::un::stream::legacy::config::ConfigSocketServer* config) {
522 config->setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config->getInstanceName());
523 });
524
525#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
526 express::tls::un::Server( //
527 "un-https",
528 router,
529 reportState,
530 [](net::un::stream::tls::config::ConfigSocketServer* config) {
531 config->setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config->getInstanceName());
532 });
533#endif
534#endif
535
536 return core::SNodeC::start();
537}
void addEventReceiver(const std::shared_ptr< express::Response > &response, const std::string &lastEventId, const std::shared_ptr< iot::mqtt::server::broker::Broker > &broker)
static MqttModel & instance()
int main(int argc, char *argv[])
static express::Router getRouter(std::shared_ptr< iot::mqtt::server::broker::Broker > broker, const std::string &webRoot)
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
static void upgrade(const std::shared_ptr< express::Request > &req, const std::shared_ptr< express::Response > &res)