MQTTSuite
Loading...
Searching...
No Matches
mqttbroker.cpp File Reference
#include "SocketContextFactory.h"
#include "config.h"
#include "lib/ConfigApplication.h"
#include "lib/Mqtt.h"
#include "lib/MqttModel.h"
#include <core/SNodeC.h>
#include <utils/Config.h>
#include <express/middleware/JsonMiddleware.h>
#include <express/middleware/StaticMiddleware.h>
#include <iot/mqtt/MqttContext.h>
#include <iot/mqtt/server/broker/Broker.h>
Include dependency graph for mqttbroker.cpp:

Go to the source code of this file.

Functions

static void upgrade (const std::shared_ptr< express::Request > &req, const std::shared_ptr< express::Response > &res)
static express::Router getRouter (std::shared_ptr< iot::mqtt::server::broker::Broker > broker, const std::string &webRoot)
static void reportState (const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
int main (int argc, char *argv[])

Function Documentation

◆ getRouter()

express::Router getRouter ( std::shared_ptr< iot::mqtt::server::broker::Broker > broker,
const std::string & webRoot )
static

Definition at line 148 of file mqttbroker.cpp.

148 {
149 const express::Router& jsonRouter = express::middleware::JsonMiddleware();
150
151 /*
152 * /api/mqtt/disconnect
153 * JSON.stringify({ clientId })
154 */
155 jsonRouter.use("/api/mqtt", [] MIDDLEWARE(req, res, next) { // cppcheck-suppress unknownMacro
156 res->set({{"Access-Control-Allow-Origin", "*"},
157 {"Access-Control-Allow-Headers", "Content-Type"},
158 {"Access-Control-Allow-Methods", "GET, OPTIONS, POST"},
159 {"Access-Control-Allow-Private-Network", "true"}});
160 next();
161 });
162
163 jsonRouter.post("/api/mqtt/disconnect", [] APPLICATION(req, res) {
164 VLOG(1) << "POST /disconnect";
165
166 req->getAttribute<nlohmann::json>(
167 [&res](nlohmann::json& json) {
168 std::string jsonString = json.dump(4);
169
170 VLOG(1) << jsonString;
171
172 std::string clientId = json["clientId"].get<std::string>();
174
175 if (mqtt != nullptr) {
176 mqtt->getMqttContext()->getSocketConnection()->close();
177 res->send(R"({"success": true, "message": "Client disconnected successfully"})"_json.dump());
178 } else {
179 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
180 }
181 },
182 [&res](const std::string& key) {
183 VLOG(1) << "Attribute type not found: " << key;
184
185 res->status(400).send("Attribute type not found: " + key);
186 });
187 });
188
189 /*
190 * /api/mqtt/unsubscribe
191 * JSON.stringify({clientId, topic})
192 */
193 jsonRouter.post("/api/mqtt/unsubscribe", [] APPLICATION(req, res) {
194 VLOG(1) << "POST /unsubscribe";
195
196 req->getAttribute<nlohmann::json>(
197 [&res](nlohmann::json& json) {
198 std::string jsonString = json.dump(4);
199
200 VLOG(1) << jsonString;
201
202 std::string clientId = json["clientId"].get<std::string>();
203 std::string topic = json["topic"].get<std::string>();
204
206
207 if (mqtt != nullptr) {
208 mqtt->unsubscribe(topic);
209 res->send(R"({"success": true, "message": "Client unsubscribed successfully"})"_json.dump());
210 } else {
211 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
212 }
213 },
214 [&res](const std::string& key) {
215 VLOG(1) << "Attribute type not found: " << key;
216
217 res->status(400).send("Attribute type not found: " + key);
218 });
219 });
220
221 /*
222 * /api/mqtt/release
223 * JSON.stringify({ topic })
224 */
225 jsonRouter.post("/api/mqtt/release", [broker] APPLICATION(req, res) {
226 VLOG(1) << "POST /release";
227
228 req->getAttribute<nlohmann::json>(
229 [&res, broker](nlohmann::json& json) {
230 std::string jsonString = json.dump(4);
231
232 VLOG(1) << jsonString;
233
234 std::string topic = json["topic"].get<std::string>();
235
236 broker->publish("", topic, "", 0, true);
238
239 res->send(R"({"success": true, "message": "Retained message released successfully"})"_json.dump());
240 },
241 [&res](const std::string& key) {
242 VLOG(1) << "Attribute type not found: " << key;
243
244 res->status(400).send("Attribute type not found: " + key);
245 });
246 });
247
248 /*
249 * /api/mqtt/subscribe
250 * JSON.stringify({ clientId, topic, qos })
251 */
252 jsonRouter.post("/api/mqtt/subscribe", [] APPLICATION(req, res) {
253 VLOG(1) << "POST /subscribe";
254
255 req->getAttribute<nlohmann::json>(
256 [&res](nlohmann::json& json) {
257 std::string jsonString = json.dump(4);
258
259 VLOG(1) << jsonString;
260
261 std::string clientId = json["clientId"].get<std::string>();
262 std::string topic = json["topic"].get<std::string>();
263 uint8_t qoS = json["qos"].get<uint8_t>();
264
266
267 if (mqtt != nullptr) {
268 mqtt->subscribe(topic, qoS);
269
270 res->send(R"({"success": true, "message": "Client subscribed successfully"})"_json.dump());
271 } else {
272 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
273 }
274 },
275 [&res](const std::string& key) {
276 VLOG(1) << "Attribute type not found: " << key;
277
278 res->status(400).send("Attribute type not found: " + key);
279 });
280 });
281 const express::Router router;
282
283 router.use(jsonRouter);
284
285 router.get("/api/mqtt/events", [broker] APPLICATION(req, res) {
286 if (web::http::ciContains(req->get("Accept"), "text/event-stream")) {
287 res->set({{"Content-Type", "text/event-stream"},
288 {"Cache-Control", "no-cache"},
289 {"Connection", "keep-alive"},
290 {"Access-Control-Allow-Origin", "*"}});
291
292 res->sendHeader();
293
294 mqtt::mqttbroker::lib::MqttModel::instance().addEventReceiver(res, req->get("Last-Event-ID"), broker);
295 } else {
296 res->redirect("/clients");
297 }
298 });
299
300 router.get("/ws", [] APPLICATION(req, res) {
301 if (req->headers.contains("upgrade")) {
302 upgrade(req, res);
303 } else {
304 res->redirect("/clients");
305 }
306 });
307
308 router.get("/mqtt", [] APPLICATION(req, res) {
309 if (req->headers.contains("upgrade")) {
310 upgrade(req, res);
311 } else {
312 res->redirect("/clients");
313 }
314 });
315
316 router.get("/", [] APPLICATION(req, res) {
317 if (req->headers.contains("upgrade")) {
318 upgrade(req, res);
319 } else {
320 res->redirect("/clients");
321 }
322 });
323
324 router.get("/sse", [broker] APPLICATION(req, res) {
325 if (web::http::ciContains(req->get("Accept"), "text/event-stream")) {
326 res->set("Content-Type", "text/event-stream") //
327 .set("Cache-Control", "no-cache")
328 .set("Connection", "keep-alive");
329 res->sendHeader();
330
331 mqtt::mqttbroker::lib::MqttModel::instance().addEventReceiver(res, req->get("Last-Event-ID"), broker);
332 } else {
333 res->redirect("/clients");
334 }
335 });
336
337 router.get("/clients", [] APPLICATION(req, res) {
338 res->redirect("/clients/index.html");
339 });
340
341 router.use("/clients", express::middleware::StaticMiddleware(webRoot));
342
343 router.get("*", [] APPLICATION(req, res) {
344 res->redirect("/clients/index.html");
345 });
346
347 return router;
348}
nlohmann::json json
void publishMessage(const std::string &topic, const std::string &message, uint8_t qoS, bool retain)
void addEventReceiver(const std::shared_ptr< express::Response > &response, const std::string &lastEventId, const std::shared_ptr< iot::mqtt::server::broker::Broker > &broker)
static MqttModel & instance()
Mqtt * getMqtt(const std::string &clientId) const
static void upgrade(const std::shared_ptr< express::Request > &req, const std::shared_ptr< express::Response > &res)

References mqtt::mqttbroker::lib::MqttModel::addEventReceiver(), mqtt::mqttbroker::lib::MqttModel::instance(), and upgrade().

Here is the call graph for this function:

◆ main()

int main ( int argc,
char * argv[] )

Definition at line 368 of file mqttbroker.cpp.

368 {
369 utils::Config::configRoot.newSubCommand<mqtt::lib::ConfigMqttBroker>()->setHtmlRoot(std::string(CMAKE_INSTALL_PREFIX) +
370 "/var/www/mqttsuite/mqttbroker");
371
372 core::SNodeC::init(argc, argv);
373
374 std::shared_ptr<iot::mqtt::server::broker::Broker> broker = iot::mqtt::server::broker::Broker::instance(
375 SUBSCRIPTION_MAX_QOS, utils::Config::configRoot.getSubCommand<mqtt::lib::ConfigMqttBroker>()->getSessionStore());
376
377#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
378 net::in::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>( //
379 "in-mqtt",
380 [](net::in::stream::legacy::config::ConfigSocketServer* config) {
381 config->setPort(1883);
382 config->setRetry();
383 config->setDisableNagleAlgorithm();
384 },
385 broker)
386 .listen([](const auto& socketAddress, core::socket::State state) {
387 reportState("in-mqtt", socketAddress, state);
388 });
389
390#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
391 net::in::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>( //
392 "in-mqtts",
393 [](net::in::stream::tls::config::ConfigSocketServer* config) {
394 config->setPort(8883);
395 config->setRetry();
396 config->setDisableNagleAlgorithm();
397 },
398 broker)
399 .listen([](const auto& socketAddress, core::socket::State state) {
400 reportState("in-mqtts", socketAddress, state);
401 });
402#endif
403#endif
404
405#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
406 net::in6::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>( //
407 "in6-mqtt",
408 [](net::in6::stream::legacy::config::ConfigSocketServer* config) {
409 config->setPort(1883);
410 config->setRetry();
411 config->setDisableNagleAlgorithm();
412
413 config->setIPv6Only();
414 },
415 broker)
416 .listen([](const auto& socketAddress, core::socket::State state) {
417 reportState("in6-mqtt", socketAddress, state);
418 });
419
420#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
421 net::in6::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>( //
422 "in6-mqtts",
423 [](net::in6::stream::tls::config::ConfigSocketServer* config) {
424 config->setPort(8883);
425 config->setRetry();
426 config->setDisableNagleAlgorithm();
427
428 config->setIPv6Only();
429 },
430 broker)
431 .listen([](const auto& socketAddress, core::socket::State state) {
432 reportState("in6-mqtts", socketAddress, state);
433 });
434#endif
435#endif
436
437#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
438 net::un::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>( //
439 "un-mqtt",
440 [](net::un::stream::legacy::config::ConfigSocketServer* config) {
441 config->setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config->getInstanceName());
442 config->setRetry();
443 },
444 broker)
445 .listen([](const auto& socketAddress, core::socket::State state) {
446 reportState("un-mqtt", socketAddress, state);
447 });
448
449#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
450 net::un::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>( //
451 "un-mqtts",
452 [](net::un::stream::tls::config::ConfigSocketServer* config) {
453 config->setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config->getInstanceName());
454 config->setRetry();
455 },
456 broker)
457 .listen([](const auto& socketAddress, core::socket::State state) {
458 reportState("un-mqtts", socketAddress, state);
459 });
460#endif
461#endif
462 express::Router router = getRouter(broker, utils::Config::configRoot.getSubCommand<mqtt::lib::ConfigMqttBroker>()->getHtmlRoot());
463
464#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
465 express::legacy::in::Server( //
466 "in-http",
467 router,
469 [](net::in::stream::legacy::config::ConfigSocketServer* config) {
470 config->setPort(8080);
471 config->setRetry();
472 config->setDisableNagleAlgorithm();
473 });
474
475#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
476 express::tls::in::Server( //
477 "in-https",
478 router,
480 [](net::in::stream::tls::config::ConfigSocketServer* config) {
481 config->setPort(8088);
482 config->setRetry();
483 config->setDisableNagleAlgorithm();
484 });
485#endif
486#endif
487
488#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
489 express::legacy::in6::Server( //
490 "in6-http",
491 router,
493 [](net::in6::stream::legacy::config::ConfigSocketServer* config) {
494 config->setPort(8080);
495 config->setRetry();
496 config->setDisableNagleAlgorithm();
497
498 config->setIPv6Only();
499 });
500
501#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
502 express::tls::in6::Server( //
503 "in6-https",
504 router,
506 [](net::in6::stream::tls::config::ConfigSocketServer* config) {
507 config->setPort(8088);
508 config->setRetry();
509 config->setDisableNagleAlgorithm();
510
511 config->setIPv6Only();
512 });
513#endif
514#endif
515
516#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
517 express::legacy::un::Server( //
518 "un-http",
519 router,
521 [](net::un::stream::legacy::config::ConfigSocketServer* config) {
522 config->setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config->getInstanceName());
523 });
524
525#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
526 express::tls::un::Server( //
527 "un-https",
528 router,
530 [](net::un::stream::tls::config::ConfigSocketServer* config) {
531 config->setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config->getInstanceName());
532 });
533#endif
534#endif
535
536 return core::SNodeC::start();
537}
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
static express::Router getRouter(std::shared_ptr< iot::mqtt::server::broker::Broker > broker, const std::string &webRoot)
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)

◆ reportState()

void reportState ( const std::string & instanceName,
const core::socket::SocketAddress & socketAddress,
const core::socket::State & state )
static

Definition at line 351 of file mqttbroker.cpp.

351 {
352 switch (state) {
353 case core::socket::State::OK:
354 VLOG(1) << instanceName << ": listening on '" << socketAddress.toString() << "'";
355 break;
356 case core::socket::State::DISABLED:
357 VLOG(1) << instanceName << ": disabled";
358 break;
359 case core::socket::State::ERROR:
360 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
361 break;
362 case core::socket::State::FATAL:
363 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
364 break;
365 }
366}

◆ upgrade()

void upgrade ( const std::shared_ptr< express::Request > & req,
const std::shared_ptr< express::Response > & res )
static

Definition at line 112 of file mqttbroker.cpp.

112 {
113 const std::string connectionName = res->getSocketContext()->getSocketConnection()->getConnectionName();
114
115 VLOG(2) << connectionName << " HTTP: Upgrade request:\n"
116 << httputils::toString(req->method,
117 req->url,
118 "HTTP/" + std::to_string(req->httpMajor) + "." + std::to_string(req->httpMinor),
119 req->queries,
120 req->headers,
121 req->trailer,
122 req->cookies,
123 std::vector<char>());
124
125 if (req->get("sec-websocket-protocol").find("mqtt") != std::string::npos) {
126 res->upgrade(req, [req, res, connectionName](const std::string& name) {
127 if (!name.empty()) {
128 VLOG(1) << connectionName << ": Successful upgrade:";
129 VLOG(1) << connectionName << ": Selected: " << name;
130 VLOG(1) << connectionName << ": Requested: " << req->get("sec-websocket-protocol");
131
132 res->end();
133 } else {
134 VLOG(1) << connectionName << ": Can not upgrade to any of '" << req->get("upgrade") << "'";
135
136 res->sendStatus(404);
137 }
138 });
139 } else {
140 VLOG(1) << connectionName << ": Unsupported subprotocol(s):";
141 VLOG(1) << " Expected: mqtt";
142 VLOG(1) << " Requested: " << req->get("sec-websocket-protocol");
143
144 res->sendStatus(404);
145 }
146}

Referenced by getRouter().

Here is the caller graph for this function: