MQTTSuite
Loading...
Searching...
No Matches
mqttbroker.cpp File Reference
#include "SocketContextFactory.h"
#include "config.h"
#include "lib/Mqtt.h"
#include "lib/MqttModel.h"
#include "lib/inja.hpp"
Include dependency graph for mqttbroker.cpp:

Go to the source code of this file.

Functions

static void upgrade APPLICATION (req, res)
static express::Router getRouter (const inja::Environment &environment, std::shared_ptr< iot::mqtt::server::broker::Broker > broker)
static void reportState (const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
int main (int argc, char *argv[])

Function Documentation

◆ APPLICATION()

void upgrade APPLICATION ( req ,
res  )
static

Definition at line 130 of file mqttbroker.cpp.

130 {
131 const std::string connectionName = res->getSocketContext()->getSocketConnection()->getConnectionName();
132
133 VLOG(2) << connectionName << " HTTP: Upgrade request:\n"
134 << httputils::toString(req->method,
135 req->url,
136 "HTTP/" + std::to_string(req->httpMajor) + "." + std::to_string(req->httpMinor),
137 req->queries,
138 req->headers,
139 req->trailer,
140 req->cookies,
141 std::vector<char>());
142
143 if (req->get("sec-websocket-protocol").find("mqtt") != std::string::npos) {
144 res->upgrade(req, [req, res, connectionName](const std::string& name) {
145 if (!name.empty()) {
146 VLOG(1) << connectionName << ": Successful upgrade:";
147 VLOG(1) << connectionName << ": Selected: " << name;
148 VLOG(1) << connectionName << ": Requested: " << req->get("sec-websocket-protocol");
149
150 res->end();
151 } else {
152 VLOG(1) << connectionName << ": Can not upgrade to any of '" << req->get("upgrade") << "'";
153
154 res->sendStatus(404);
155 }
156 });
157 } else {
158 VLOG(1) << connectionName << ": Unsupported subprotocol(s):";
159 VLOG(1) << " Expected: mqtt";
160 VLOG(1) << " Requested: " << req->get("sec-websocket-protocol");
161
162 res->sendStatus(404);
163 }
164}

◆ getRouter()

express::Router getRouter ( const inja::Environment & environment,
std::shared_ptr< iot::mqtt::server::broker::Broker > broker )
static

Definition at line 166 of file mqttbroker.cpp.

167 {
168 const express::Router& jsonRouter = express::middleware::JsonMiddleware();
169
170 /*
171 * /api/mqtt/disconnect
172 * JSON.stringify({ clientId })
173 */
174 jsonRouter.use("/api/mqtt", [] MIDDLEWARE(req, res, next) { // cppcheck-suppress unknownMacro
175 res->set({{"Access-Control-Allow-Origin", "*"},
176 {"Access-Control-Allow-Headers", "Content-Type"},
177 {"Access-Control-Allow-Methods", "GET, OPTIONS, POST"},
178 {"Access-Control-Allow-Private-Network", "true"}});
179 next();
180 });
181
182 jsonRouter.post("/api/mqtt/disconnect", [] APPLICATION(req, res) {
183 VLOG(1) << "POST /disconnect";
184
185 req->getAttribute<nlohmann::json>(
186 [&res](nlohmann::json& json) {
187 std::string jsonString = json.dump(4);
188
189 VLOG(1) << jsonString;
190
191 std::string clientId = json["clientId"].get<std::string>();
193
194 if (mqtt != nullptr) {
195 mqtt->getMqttContext()->getSocketConnection()->close();
196 res->send(R"({"success": true, "message": "Client disconnected successfully"})"_json.dump());
197 } else {
198 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
199 }
200 },
201 [&res](const std::string& key) {
202 VLOG(1) << "Attribute type not found: " << key;
203
204 res->status(400).send("Attribute type not found: " + key);
205 });
206 });
207
208 /*
209 * /api/mqtt/unsubscribe
210 * JSON.stringify({clientId, topic})
211 */
212 jsonRouter.post("/api/mqtt/unsubscribe", [] APPLICATION(req, res) {
213 VLOG(1) << "POST /unsubscribe";
214
215 req->getAttribute<nlohmann::json>(
216 [&res](nlohmann::json& json) {
217 std::string jsonString = json.dump(4);
218
219 VLOG(1) << jsonString;
220
221 std::string clientId = json["clientId"].get<std::string>();
222 std::string topic = json["topic"].get<std::string>();
223
225
226 if (mqtt != nullptr) {
227 mqtt->unsubscribe(topic);
228 res->send(R"({"success": true, "message": "Client unsubscribed successfully"})"_json.dump());
229 } else {
230 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
231 }
232 },
233 [&res](const std::string& key) {
234 VLOG(1) << "Attribute type not found: " << key;
235
236 res->status(400).send("Attribute type not found: " + key);
237 });
238 });
239
240 /*
241 * /api/mqtt/release
242 * JSON.stringify({ topic })
243 */
244 jsonRouter.post("/api/mqtt/release", [broker] APPLICATION(req, res) {
245 VLOG(1) << "POST /release";
246
247 req->getAttribute<nlohmann::json>(
248 [&res, broker](nlohmann::json& json) {
249 std::string jsonString = json.dump(4);
250
251 VLOG(1) << jsonString;
252
253 std::string topic = json["topic"].get<std::string>();
254
255 broker->publish("", topic, "", 0, true);
257
258 res->send(R"({"success": true, "message": "Retained message released successfully"})"_json.dump());
259 },
260 [&res](const std::string& key) {
261 VLOG(1) << "Attribute type not found: " << key;
262
263 res->status(400).send("Attribute type not found: " + key);
264 });
265 });
266
267 /*
268 * /api/mqtt/subscribe
269 * JSON.stringify({ clientId, topic, qos })
270 */
271 jsonRouter.post("/api/mqtt/subscribe", [] APPLICATION(req, res) {
272 VLOG(1) << "POST /subscribe";
273
274 req->getAttribute<nlohmann::json>(
275 [&res](nlohmann::json& json) {
276 std::string jsonString = json.dump(4);
277
278 VLOG(1) << jsonString;
279
280 std::string clientId = json["clientId"].get<std::string>();
281 std::string topic = json["topic"].get<std::string>();
282 uint8_t qoS = json["qos"].get<uint8_t>();
283
285
286 if (mqtt != nullptr) {
287 mqtt->subscribe(topic, qoS);
288
289 res->send(R"({"success": true, "message": "Client subscribed successfully"})"_json.dump());
290 } else {
291 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
292 }
293 },
294 [&res](const std::string& key) {
295 VLOG(1) << "Attribute type not found: " << key;
296
297 res->status(400).send("Attribute type not found: " + key);
298 });
299 });
300 const express::Router router;
301
302 router.use(jsonRouter);
303
304 router.get("/api/mqtt/events", [broker] APPLICATION(req, res) {
305 if (web::http::ciContains(req->get("Accept"), "text/event-stream")) {
306 res->set({{"Content-Type", "text/event-stream"},
307 {"Cache-Control", "no-cache"},
308 {"Connection", "keep-alive"},
309 {"Access-Control-Allow-Origin", "*"}});
310
311 res->sendHeader();
312
313 mqtt::mqttbroker::lib::MqttModel::instance().addEventReceiver(res, req->get("Last-Event-ID"), broker);
314 } else {
315 res->redirect("/clients");
316 }
317 });
318
319 router.setStrictRouting();
320 router.get("/clients", [] APPLICATION(req, res) {
321 res->redirect("/clients/index.html");
322 });
323
324 router.get("/clients", express::middleware::StaticMiddleware(utils::Config::getStringOptionValue("--html-dir")));
325
326 router.get("/ws", [] APPLICATION(req, res) {
327 if (req->headers.contains("upgrade")) {
328 upgrade(req, res);
329 } else {
330 res->redirect("/clients");
331 }
332 });
333
334 router.get("/mqtt", [] APPLICATION(req, res) {
335 if (req->headers.contains("upgrade")) {
336 upgrade(req, res);
337 } else {
338 res->redirect("/clients");
339 }
340 });
341
342 router.get("/", [] APPLICATION(req, res) {
343 if (req->headers.contains("upgrade")) {
344 upgrade(req, res);
345 } else {
346 res->redirect("/clients");
347 }
348 });
349
350 router.get("/sse", [broker] APPLICATION(req, res) {
351 if (web::http::ciContains(req->get("Accept"), "text/event-stream")) {
352 res->set("Content-Type", "text/event-stream") //
353 .set("Cache-Control", "no-cache")
354 .set("Connection", "keep-alive");
355 res->sendHeader();
356
357 mqtt::mqttbroker::lib::MqttModel::instance().addEventReceiver(res, req->get("Last-Event-ID"), broker);
358 } else {
359 res->redirect("/clients");
360 }
361 });
362
363 return router;
364}
nlohmann::json json
void publishMessage(const std::string &topic, const std::string &message, uint8_t qoS, bool retain)
void addEventReceiver(const std::shared_ptr< express::Response > &response, const std::string &lastEventId, const std::shared_ptr< iot::mqtt::server::broker::Broker > &broker)
static MqttModel & instance()
Mqtt * getMqtt(const std::string &clientId) const
static void upgrade APPLICATION(req, res)

References mqtt::mqttbroker::lib::MqttModel::addEventReceiver(), and mqtt::mqttbroker::lib::MqttModel::instance().

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ main()

int main ( int argc,
char * argv[] )

Definition at line 384 of file mqttbroker.cpp.

384 {
385 utils::Config::addStringOption("--mqtt-mapping-file", "MQTT mapping file (json format) for integration", "[path]", "");
386 utils::Config::addStringOption("--mqtt-session-store", "Path to file for the persistent session store", "[path]", "");
387 utils::Config::addStringOption(
388 "--html-dir", "Path to html source directory", "[path]", std::string(CMAKE_INSTALL_PREFIX) + "/var/www/mqttsuite/mqttbroker");
389
390 core::SNodeC::init(argc, argv);
391
392 std::shared_ptr<iot::mqtt::server::broker::Broker> broker =
393 iot::mqtt::server::broker::Broker::instance(SUBSCRIPTION_MAX_QOS, utils::Config::getStringOptionValue("--mqtt-session-store"));
394
395#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
396 net::in::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
397 "in-mqtt",
398 [](auto& config) {
399 config.setPort(1883);
400 config.setRetry();
401 config.setDisableNagleAlgorithm();
402 },
403 broker)
404 .listen([](const auto& socketAddress, core::socket::State state) {
405 reportState("in-mqtt", socketAddress, state);
406 });
407
408#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
409 net::in::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
410 "in-mqtts",
411 [](auto& config) {
412 config.setPort(8883);
413 config.setRetry();
414 config.setDisableNagleAlgorithm();
415 },
416 broker)
417 .listen([](const auto& socketAddress, core::socket::State state) {
418 reportState("in-mqtts", socketAddress, state);
419 });
420#endif
421#endif
422
423#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
424 net::in6::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
425 "in6-mqtt",
426 [](auto& config) {
427 config.setPort(1883);
428 config.setRetry();
429 config.setDisableNagleAlgorithm();
430
431 config.setIPv6Only();
432 },
433 broker)
434 .listen([](const auto& socketAddress, core::socket::State state) {
435 reportState("in6-mqtt", socketAddress, state);
436 });
437
438#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
439 net::in6::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
440 "in6-mqtts",
441 [](auto& config) {
442 config.setPort(8883);
443 config.setRetry();
444 config.setDisableNagleAlgorithm();
445
446 config.setIPv6Only();
447 },
448 broker)
449 .listen([](const auto& socketAddress, core::socket::State state) {
450 reportState("in6-mqtts", socketAddress, state);
451 });
452#endif
453#endif
454
455#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
456 net::un::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
457 "un-mqtt",
458 [](auto& config) {
459 config.setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config.getInstanceName());
460 config.setRetry();
461 },
462 broker)
463 .listen([](const auto& socketAddress, core::socket::State state) {
464 reportState("un-mqtt", socketAddress, state);
465 });
466
467#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
468 net::un::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
469 "un-mqtts",
470 [](auto& config) {
471 config.setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config.getInstanceName());
472 config.setRetry();
473 },
474 broker)
475 .listen([](const auto& socketAddress, core::socket::State state) {
476 reportState("un-mqtts", socketAddress, state);
477 });
478#endif
479#endif
480
481 inja::Environment environment{utils::Config::getStringOptionValue("--html-dir") + "/"};
482 express::Router router = getRouter(environment, broker);
483
484#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
485 express::legacy::in::Server("in-http", router, reportState, [](auto& config) {
486 config.setPort(8080);
487 config.setRetry();
488 config.setDisableNagleAlgorithm();
489 });
490
491#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
492 express::tls::in::Server("in-https", router, reportState, [](auto& config) {
493 config.setPort(8088);
494 config.setRetry();
495 config.setDisableNagleAlgorithm();
496 });
497#endif
498#endif
499
500#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
501 express::legacy::in6::Server("in6-http", router, reportState, [](auto& config) {
502 config.setPort(8080);
503 config.setRetry();
504 config.setDisableNagleAlgorithm();
505
506 config.setIPv6Only();
507 });
508
509#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
510 express::tls::in6::Server("in6-https", router, reportState, [](auto& config) {
511 config.setPort(8088);
512 config.setRetry();
513 config.setDisableNagleAlgorithm();
514
515 config.setIPv6Only();
516 });
517#endif
518#endif
519
520#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
521 express::legacy::un::Server("un-http", router, reportState, [](auto& config) {
522 config.setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config.getInstanceName());
523 });
524
525#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
526 express::tls::un::Server("un-https", router, reportState, [](auto& config) {
527 config.setSunPath("/tmp/" + utils::Config::getApplicationName() + "-" + config.getInstanceName());
528 });
529#endif
530#endif
531
532 return core::SNodeC::start();
533}
Class for changing the configuration.
Definition inja.hpp:2851
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
static express::Router getRouter(const inja::Environment &environment, std::shared_ptr< iot::mqtt::server::broker::Broker > broker)
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)

References getRouter().

Here is the call graph for this function:

◆ reportState()

void reportState ( const std::string & instanceName,
const core::socket::SocketAddress & socketAddress,
const core::socket::State & state )
static

Definition at line 367 of file mqttbroker.cpp.

367 {
368 switch (state) {
369 case core::socket::State::OK:
370 VLOG(1) << instanceName << ": listening on '" << socketAddress.toString() << "'";
371 break;
372 case core::socket::State::DISABLED:
373 VLOG(1) << instanceName << ": disabled";
374 break;
375 case core::socket::State::ERROR:
376 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
377 break;
378 case core::socket::State::FATAL:
379 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
380 break;
381 }
382}