135static void upgrade APPLICATION(req, res) {
136 const std::string connectionName = res->getSocketContext()->getSocketConnection()->getConnectionName();
138 VLOG(2) << connectionName <<
" HTTP: Upgrade request:\n"
139 << httputils::toString(req->method,
141 "HTTP/" + std::to_string(req->httpMajor) +
"." + std::to_string(req->httpMinor),
146 std::vector<
char>());
148 if (req->get(
"sec-websocket-protocol").find(
"mqtt") != std::string::npos) {
149 res->upgrade(req, [req, res, connectionName](
const std::string& name) {
151 VLOG(1) << connectionName <<
": Successful upgrade:";
152 VLOG(1) << connectionName <<
": Selected: " << name;
153 VLOG(1) << connectionName <<
": Requested: " << req->get(
"sec-websocket-protocol");
157 VLOG(1) << connectionName <<
": Can not upgrade to any of '" << req->get(
"upgrade") <<
"'";
159 res->sendStatus(404);
163 VLOG(1) << connectionName <<
": Unsupported subprotocol(s):";
164 VLOG(1) <<
" Expected: mqtt";
165 VLOG(1) <<
" Requested: " << req->get(
"sec-websocket-protocol");
167 res->sendStatus(404);
196 std::shared_ptr<iot::
mqtt::server::
broker::Broker> broker,
200 json[
"table-id"] =
"overview";
201 json[
"title"] =
"MQTTBroker | Active Clients";
202 json[
"voc"] = href(
"Volker Christian",
"https://github.com/VolkerChristian/");
203 json[
"broker"] = href(
"MQTTBroker",
"https://github.com/SNodeC/mqttsuite/tree/master/mqttbroker");
204 json[
"suite"] = href(
"MQTTSuite",
"https://github.com/SNodeC/mqttsuite");
205 json[
"snodec"] =
"Powered by " + href(
"SNode.C",
"https://github.com/SNodeC/snode.c");
208 json[
"header_row"] = {
"Client ID",
"Online Since",
"Duration",
"Connection",
"Local Address",
"Remote Address",
"Action"};
209 json[
"data_rows"] = inja::json::array();
210 json[
"subscribed_topics"] = inja::json::array();
211 json[
"retained_topics"] = inja::json::array();
213 const std::map<std::string, std::list<std::pair<std::string, uint8_t>>>& subscribedTopics = broker->getSubscriptionTree();
215 inja::json& subscribedTopicsJson = json[
"subscribed_topics"];
216 for (
const auto& [topic, clients] : subscribedTopics) {
217 inja::json topicJson;
219 topicJson[
"key"] = topic;
220 for (
const auto& client : clients) {
221 std::ostringstream windowId(
"window");
222 for (
char ch : client.first) {
223 if (std::isalnum(
static_cast<
unsigned char>(ch))) {
226 windowId << std::hex << std::uppercase << std::setw(2) << std::setfill(
'0')
227 <<
static_cast<
int>(
static_cast<
unsigned char>(ch));
231 topicJson[
"values"].push_back({{
"client_id", client.first},
232 {
"link", href(client.first,
"/client?" + client.first, windowId.str(), 450, 900)},
234 {
"qos", std::to_string(
static_cast<
int>(client.second))}});
237 subscribedTopicsJson.push_back(topicJson);
240 std::list<std::pair<std::string, std::string>> retainTree = broker->getRetainTree();
242 inja::json& retainedTopicsJson = json[
"retained_topics"];
243 for (
const auto& [topic, message] : retainTree) {
244 inja::json topicJson;
246 topicJson[
"key"] = topic;
247 topicJson[
"values"].push_back(message);
249 retainedTopicsJson.push_back(topicJson);
252 return environment.render_file(
"OverviewPage.html", json);
256 return environment.render_file(
"DetailPage.html",
257 {{
"table-id",
"detail"},
258 {
"client_id", mqtt->getClientId()},
259 {
"title", mqtt->getClientId()},
260 {
"header_row", {
"Attribute",
"Value"}},
262 inja::json::array({{
"Client ID", mqtt->getClientId()},
263 {
"Connection", mqtt->getConnectionName()},
264 {
"Clean Session", mqtt->getCleanSession() ?
"true" :
"false"},
265 {
"Connect Flags", std::to_string(mqtt->getConnectFlags())},
266 {
"Username", mqtt->getUsername()},
267 {
"Username Flag", mqtt->getUsernameFlag() ?
"true" :
"false"},
268 {
"Password", mqtt->getPassword()},
269 {
"Password Flag", mqtt->getPasswordFlag() ?
"true" :
"false"},
270 {
"Keep Alive", std::to_string(mqtt->getKeepAlive())},
271 {
"Protocol", mqtt->getProtocol()},
272 {
"Protocol Level", std::to_string(mqtt->getLevel())},
273 {
"Loop Prevention", !mqtt->getReflect() ?
"true" :
"false"},
274 {
"Will Message", mqtt->getWillMessage()},
275 {
"Will Topic", mqtt->getWillTopic()},
276 {
"Will QoS", std::to_string(mqtt->getWillQoS())},
277 {
"Will Flag", mqtt->getWillFlag() ?
"true" :
"false"},
278 {
"Will Retain", mqtt->getWillRetain() ?
"true" :
"false"}})},
279 {
"client_id", mqtt->getClientId()},
280 {
"topics", mqtt->getSubscriptions()}});
321 const express::Router& jsonRouter =
express::middleware::JsonMiddleware();
323 jsonRouter.post(
"/disconnect", [] APPLICATION(req, res) {
324 VLOG(1) <<
"POST /disconnect";
326 req->getAttribute<nlohmann::json>(
327 [&res](nlohmann::json& json) {
328 std::string jsonString = json.dump(4);
330 VLOG(1) << jsonString;
332 std::string clientId = json[
"client_id"].get<std::string>();
333 const mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
335 if (mqtt !=
nullptr) {
336 mqtt->getMqttContext()->getSocketConnection()->close();
337 res->send(jsonString);
339 res->status(404).send(
"MQTT client has never existed or already gone away: '" + clientId +
"'");
342 [&res](
const std::string& key) {
343 VLOG(1) <<
"Attribute type not found: " << key;
345 res->status(400).send(
"Attribute type not found: " + key);
349 jsonRouter.post(
"/unsubscribe", [] APPLICATION(req, res) {
350 VLOG(1) <<
"POST /unsubscribe";
352 req->getAttribute<nlohmann::json>(
353 [&res](nlohmann::json& json) {
354 std::string jsonString = json.dump(4);
356 VLOG(1) << jsonString;
358 std::string clientId = json[
"client_id"].get<std::string>();
359 std::string topic = json[
"topic"].get<std::string>();
361 const mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
363 if (mqtt !=
nullptr) {
364 mqtt->unsubscribe(topic);
365 res->send(jsonString);
367 res->status(404).send(
"MQTT client has never existed or already gone away: '" + clientId +
"'");
370 [&res](
const std::string& key) {
371 VLOG(1) <<
"Attribute type not found: " << key;
373 res->status(400).send(
"Attribute type not found: " + key);
377 jsonRouter.post(
"/release", [broker] APPLICATION(req, res) {
378 VLOG(1) <<
"POST /release";
380 req->getAttribute<nlohmann::json>(
381 [&res, broker](nlohmann::json& json) {
382 std::string jsonString = json.dump(4);
384 VLOG(1) << jsonString;
386 std::string topic = json[
"topic"].get<std::string>();
388 broker->release(topic);
390 res->send(jsonString);
392 [&res](
const std::string& key) {
393 VLOG(1) <<
"Attribute type not found: " << key;
395 res->status(400).send(
"Attribute type not found: " + key);
399 jsonRouter.post(
"/subscribe", [] APPLICATION(req, res) {
400 VLOG(1) <<
"POST /subscribe";
402 req->getAttribute<nlohmann::json>(
403 [&res](nlohmann::json& json) {
404 std::string jsonString = json.dump(4);
406 VLOG(1) << jsonString;
408 std::string clientId = json[
"client_id"].get<std::string>();
409 std::string topic = json[
"topic"].get<std::string>();
410 uint8_t qoS = json[
"qos"].get<uint8_t>();
412 const mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
414 if (mqtt !=
nullptr) {
415 mqtt->subscribe(topic, qoS);
416 res->send(jsonString);
418 res->status(404).send(
"MQTT client has never existed or already gone away: '" + clientId +
"'");
421 [&res](
const std::string& key) {
422 VLOG(1) <<
"Attribute type not found: " << key;
424 res->status(400).send(
"Attribute type not found: " + key);
429 router.use(jsonRouter);
431 router.get(
"/clients", [environment, broker] APPLICATION(req, res) {
432 std::string responseString;
433 int responseStatus = 200;
438 responseStatus = 500;
439 responseString =
"Internal Server Error\n";
440 responseString += std::string(error.what()) +
" " + error
.type +
" " + error
.message +
" " +
444 res->status(responseStatus).send(responseString);
447 router.get(
"/client", [environment] APPLICATION(req, res) {
448 std::string responseString;
449 int responseStatus = 200;
451 if (req->queries.size() == 1) {
452 const mqtt::mqttbroker::
lib::
Mqtt* mqtt =
455 if (mqtt !=
nullptr) {
456 VLOG(1) <<
"Subscriptions for client " << mqtt->getClientId();
457 for (
const std::string& subscription : mqtt->getSubscriptions()) {
458 VLOG(1) <<
" " << subscription;
464 responseStatus = 500;
465 responseString =
"Internal Server Error\n";
466 responseString += std::string(error.what()) +
" " + error
.type +
" " + error
.message +
" " +
470 responseStatus = 404;
471 responseString =
"Not Found: " + urlDecode(req->queries.begin()->first);
474 responseStatus = 400;
475 responseString =
"Bad Request: No Client requested";
478 res->status(responseStatus).send(responseString);
481 router.get(
"/spinner", [environment] APPLICATION(req, res) {
482 std::string responseString;
483 int responseStatus = 200;
485 if (req->queries.size() == 1) {
486 responseString = getRedirectSpinnerPage(environment, req->queries.begin()->first);
488 responseStatus = 400;
489 responseString =
"Bad Request: No Client requested";
491 res->status(responseStatus).send(responseString);
494 router.get(
"/ws", [] APPLICATION(req, res) {
495 if (req->headers.contains(
"upgrade")) {
498 res->redirect(
"/spinner?/clients");
502 router.get(
"/mqtt", [] APPLICATION(req, res) {
503 if (req->headers.contains(
"upgrade")) {
506 res->redirect(
"/spinner?/clients");
510 router.get(
"/", [] APPLICATION(req, res) {
511 if (req->headers.contains(
"upgrade")) {
514 res->redirect(
"/spinner?/clients");
518 router.get(
"/sse", [] APPLICATION(req, res) {
519 if (web::http::ciContains(req->get(
"Accept"),
"text/event-stream")) {
520 res->set(
"Content-Type",
"text/event-stream")
521 .set(
"Cache-Control",
"no-cache")
522 .set(
"Connection",
"keep-alive");
527 res->redirect(
"/spinner?/clients");
535reportState(
const std::string& instanceName,
const core::socket::SocketAddress& socketAddress,
const core::socket::State& state) {
537 case core::socket::State::OK:
538 VLOG(1) << instanceName <<
": listening on '" << socketAddress.toString() <<
"'";
540 case core::socket::State::DISABLED:
541 VLOG(1) << instanceName <<
": disabled";
543 case core::socket::State::ERROR:
544 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
546 case core::socket::State::FATAL:
547 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
552int main(
int argc,
char* argv[]) {
553 utils::Config::addStringOption(
"--mqtt-mapping-file",
"MQTT mapping file (json format) for integration",
"[path]",
"");
554 utils::Config::addStringOption(
"--mqtt-session-store",
"Path to file for the persistent session store",
"[path]",
"");
555 utils::Config::addStringOption(
556 "--html-dir",
"Path to html source directory",
"[path]", std::string(CMAKE_INSTALL_PREFIX) +
"/var/www/mqttsuite/mqttbroker");
558 core::SNodeC::init(argc, argv);
560 std::shared_ptr<iot::
mqtt::server::
broker::Broker> broker =
561 iot::
mqtt::server::
broker::Broker::instance(SUBSCRIPTION_MAX_QOS, utils::Config::getStringOptionValue(
"--mqtt-session-store"));
563#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
564 net::in::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
567 config.setPort(1883);
569 config.setDisableNagleAlgorithm();
572 .listen([](
const auto& socketAddress, core::socket::State state) {
573 reportState(
"in-mqtt", socketAddress, state);
576#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
577 net::in::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
580 config.setPort(8883);
582 config.setDisableNagleAlgorithm();
585 .listen([](
const auto& socketAddress, core::socket::State state) {
586 reportState(
"in-mqtts", socketAddress, state);
591#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
592 net::in6::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
595 config.setPort(1883);
597 config.setDisableNagleAlgorithm();
599 config.setIPv6Only();
602 .listen([](
const auto& socketAddress, core::socket::State state) {
603 reportState(
"in6-mqtt", socketAddress, state);
606#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
607 net::in6::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
610 config.setPort(8883);
612 config.setDisableNagleAlgorithm();
614 config.setIPv6Only();
617 .listen([](
const auto& socketAddress, core::socket::State state) {
618 reportState(
"in6-mqtts", socketAddress, state);
623#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
624 net::un::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
627 config.setSunPath(
"/tmp/" + utils::Config::getApplicationName() +
"-" + config.getInstanceName());
631 .listen([](
const auto& socketAddress, core::socket::State state) {
632 reportState(
"un-mqtt", socketAddress, state);
635#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
636 net::un::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
639 config.setSunPath(
"/tmp/" + utils::Config::getApplicationName() +
"-" + config.getInstanceName());
643 .listen([](
const auto& socketAddress, core::socket::State state) {
644 reportState(
"un-mqtts", socketAddress, state);
649 inja::
Environment environment{utils::Config::getStringOptionValue(
"--html-dir") +
"/"};
652#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
653 express::legacy::in::Server(
"in-http", router, reportState, [](
auto& config) {
654 config.setPort(8080);
656 config.setDisableNagleAlgorithm();
659#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
660 express::tls::in::Server(
"in-https", router, reportState, [](
auto& config) {
661 config.setPort(8088);
663 config.setDisableNagleAlgorithm();
668#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
669 express::legacy::in6::Server(
"in6-http", router, reportState, [](
auto& config) {
670 config.setPort(8080);
672 config.setDisableNagleAlgorithm();
674 config.setIPv6Only();
677#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
678 express::tls::in6::Server(
"in6-https", router, reportState, [](
auto& config) {
679 config.setPort(8088);
681 config.setDisableNagleAlgorithm();
683 config.setIPv6Only();
688#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
689 express::legacy::un::Server(
"un-http", router, reportState, [](
auto& config) {
690 config.setSunPath(
"/tmp/" + utils::Config::getApplicationName() +
"-" + config.getInstanceName());
693#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
694 express::tls::un::Server(
"un-https", router, reportState, [](
auto& config) {
695 config.setSunPath(
"/tmp/" + utils::Config::getApplicationName() +
"-" + config.getInstanceName());
700 return core::SNodeC::start();