130static void upgrade APPLICATION(req, res) {
131 const std::string connectionName = res->getSocketContext()->getSocketConnection()->getConnectionName();
133 VLOG(2) << connectionName <<
" HTTP: Upgrade request:\n"
134 << httputils::toString(req->method,
136 "HTTP/" + std::to_string(req->httpMajor) +
"." + std::to_string(req->httpMinor),
141 std::vector<
char>());
143 if (req->get(
"sec-websocket-protocol").find(
"mqtt") != std::string::npos) {
144 res->upgrade(req, [req, res, connectionName](
const std::string& name) {
146 VLOG(1) << connectionName <<
": Successful upgrade:";
147 VLOG(1) << connectionName <<
": Selected: " << name;
148 VLOG(1) << connectionName <<
": Requested: " << req->get(
"sec-websocket-protocol");
152 VLOG(1) << connectionName <<
": Can not upgrade to any of '" << req->get(
"upgrade") <<
"'";
154 res->sendStatus(404);
158 VLOG(1) << connectionName <<
": Unsupported subprotocol(s):";
159 VLOG(1) <<
" Expected: mqtt";
160 VLOG(1) <<
" Requested: " << req->get(
"sec-websocket-protocol");
162 res->sendStatus(404);
167 std::shared_ptr<iot::
mqtt::server::
broker::Broker> broker) {
168 const express::Router& jsonRouter =
express::middleware::JsonMiddleware();
171
172
173
174 jsonRouter.use(
"/api/mqtt", [] MIDDLEWARE(req, res, next) {
175 res->set({{
"Access-Control-Allow-Origin",
"*"},
176 {
"Access-Control-Allow-Headers",
"Content-Type"},
177 {
"Access-Control-Allow-Methods",
"GET, OPTIONS, POST"},
178 {
"Access-Control-Allow-Private-Network",
"true"}});
182 jsonRouter.post(
"/api/mqtt/disconnect", [] APPLICATION(req, res) {
183 VLOG(1) <<
"POST /disconnect";
185 req->getAttribute<nlohmann::json>(
186 [&res](nlohmann::json& json) {
187 std::string jsonString = json.dump(4);
189 VLOG(1) << jsonString;
191 std::string clientId = json[
"clientId"].get<std::string>();
192 const mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
194 if (mqtt !=
nullptr) {
195 mqtt->getMqttContext()->getSocketConnection()->close();
196 res->send(R"({"success": true, "message": "Client disconnected successfully"})"_json.dump());
198 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
201 [&res](
const std::string& key) {
202 VLOG(1) <<
"Attribute type not found: " << key;
204 res->status(400).send(
"Attribute type not found: " + key);
209
210
211
212 jsonRouter.post(
"/api/mqtt/unsubscribe", [] APPLICATION(req, res) {
213 VLOG(1) <<
"POST /unsubscribe";
215 req->getAttribute<nlohmann::json>(
216 [&res](nlohmann::json& json) {
217 std::string jsonString = json.dump(4);
219 VLOG(1) << jsonString;
221 std::string clientId = json[
"clientId"].get<std::string>();
222 std::string topic = json[
"topic"].get<std::string>();
224 mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
226 if (mqtt !=
nullptr) {
227 mqtt->unsubscribe(topic);
228 res->send(R"({"success": true, "message": "Client unsubscribed successfully"})"_json.dump());
230 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
233 [&res](
const std::string& key) {
234 VLOG(1) <<
"Attribute type not found: " << key;
236 res->status(400).send(
"Attribute type not found: " + key);
241
242
243
244 jsonRouter.post(
"/api/mqtt/release", [broker] APPLICATION(req, res) {
245 VLOG(1) <<
"POST /release";
247 req->getAttribute<nlohmann::json>(
248 [&res, broker](nlohmann::json& json) {
249 std::string jsonString = json.dump(4);
251 VLOG(1) << jsonString;
253 std::string topic = json[
"topic"].get<std::string>();
255 broker->publish(
"", topic,
"", 0,
true);
256 mqtt::mqttbroker::lib::MqttModel::instance().publishMessage(topic,
"", 0,
true);
258 res->send(R"({"success": true, "message": "Retained message released successfully"})"_json.dump());
260 [&res](
const std::string& key) {
261 VLOG(1) <<
"Attribute type not found: " << key;
263 res->status(400).send(
"Attribute type not found: " + key);
268
269
270
271 jsonRouter.post(
"/api/mqtt/subscribe", [] APPLICATION(req, res) {
272 VLOG(1) <<
"POST /subscribe";
274 req->getAttribute<nlohmann::json>(
275 [&res](nlohmann::json& json) {
276 std::string jsonString = json.dump(4);
278 VLOG(1) << jsonString;
280 std::string clientId = json[
"clientId"].get<std::string>();
281 std::string topic = json[
"topic"].get<std::string>();
282 uint8_t qoS = json[
"qos"].get<uint8_t>();
284 mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
286 if (mqtt !=
nullptr) {
287 mqtt->subscribe(topic, qoS);
289 res->send(R"({"success": true, "message": "Client subscribed successfully"})"_json.dump());
291 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
294 [&res](
const std::string& key) {
295 VLOG(1) <<
"Attribute type not found: " << key;
297 res->status(400).send(
"Attribute type not found: " + key);
302 router.use(jsonRouter);
304 router.get(
"/api/mqtt/events", [broker] APPLICATION(req, res) {
305 if (web::http::ciContains(req->get(
"Accept"),
"text/event-stream")) {
306 res->set({{
"Content-Type",
"text/event-stream"},
307 {
"Cache-Control",
"no-cache"},
308 {
"Connection",
"keep-alive"},
309 {
"Access-Control-Allow-Origin",
"*"}});
315 res->redirect(
"/clients");
319 router.setStrictRouting();
320 router.get(
"/clients", [] APPLICATION(req, res) {
321 res->redirect(
"/clients/index.html");
324 router.get(
"/clients",
express::middleware::StaticMiddleware(utils::Config::getStringOptionValue(
"--html-dir")));
326 router.get(
"/ws", [] APPLICATION(req, res) {
327 if (req->headers.contains(
"upgrade")) {
330 res->redirect(
"/clients");
334 router.get(
"/mqtt", [] APPLICATION(req, res) {
335 if (req->headers.contains(
"upgrade")) {
338 res->redirect(
"/clients");
342 router.get(
"/", [] APPLICATION(req, res) {
343 if (req->headers.contains(
"upgrade")) {
346 res->redirect(
"/clients");
350 router.get(
"/sse", [broker] APPLICATION(req, res) {
351 if (web::http::ciContains(req->get(
"Accept"),
"text/event-stream")) {
352 res->set(
"Content-Type",
"text/event-stream")
353 .set(
"Cache-Control",
"no-cache")
354 .set(
"Connection",
"keep-alive");
359 res->redirect(
"/clients");
367reportState(
const std::string& instanceName,
const core::socket::SocketAddress& socketAddress,
const core::socket::State& state) {
369 case core::socket::State::OK:
370 VLOG(1) << instanceName <<
": listening on '" << socketAddress.toString() <<
"'";
372 case core::socket::State::DISABLED:
373 VLOG(1) << instanceName <<
": disabled";
375 case core::socket::State::ERROR:
376 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
378 case core::socket::State::FATAL:
379 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
384int main(
int argc,
char* argv[]) {
385 utils::Config::addStringOption(
"--mqtt-mapping-file",
"MQTT mapping file (json format) for integration",
"[path]",
"");
386 utils::Config::addStringOption(
"--mqtt-session-store",
"Path to file for the persistent session store",
"[path]",
"");
387 utils::Config::addStringOption(
388 "--html-dir",
"Path to html source directory",
"[path]", std::string(CMAKE_INSTALL_PREFIX) +
"/var/www/mqttsuite/mqttbroker");
390 core::SNodeC::init(argc, argv);
392 std::shared_ptr<iot::
mqtt::server::
broker::Broker> broker =
393 iot::
mqtt::server::
broker::Broker::instance(SUBSCRIPTION_MAX_QOS, utils::Config::getStringOptionValue(
"--mqtt-session-store"));
395#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
396 net::in::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
399 config.setPort(1883);
401 config.setDisableNagleAlgorithm();
404 .listen([](
const auto& socketAddress, core::socket::State state) {
405 reportState(
"in-mqtt", socketAddress, state);
408#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
409 net::in::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
412 config.setPort(8883);
414 config.setDisableNagleAlgorithm();
417 .listen([](
const auto& socketAddress, core::socket::State state) {
418 reportState(
"in-mqtts", socketAddress, state);
423#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
424 net::in6::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
427 config.setPort(1883);
429 config.setDisableNagleAlgorithm();
431 config.setIPv6Only();
434 .listen([](
const auto& socketAddress, core::socket::State state) {
435 reportState(
"in6-mqtt", socketAddress, state);
438#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
439 net::in6::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
442 config.setPort(8883);
444 config.setDisableNagleAlgorithm();
446 config.setIPv6Only();
449 .listen([](
const auto& socketAddress, core::socket::State state) {
450 reportState(
"in6-mqtts", socketAddress, state);
455#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
456 net::un::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
459 config.setSunPath(
"/tmp/" + utils::Config::getApplicationName() +
"-" + config.getInstanceName());
463 .listen([](
const auto& socketAddress, core::socket::State state) {
464 reportState(
"un-mqtt", socketAddress, state);
467#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
468 net::un::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
471 config.setSunPath(
"/tmp/" + utils::Config::getApplicationName() +
"-" + config.getInstanceName());
475 .listen([](
const auto& socketAddress, core::socket::State state) {
476 reportState(
"un-mqtts", socketAddress, state);
481 inja::
Environment environment{utils::Config::getStringOptionValue(
"--html-dir") +
"/"};
484#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
485 express::legacy::in::Server(
"in-http", router, reportState, [](
auto& config) {
486 config.setPort(8080);
488 config.setDisableNagleAlgorithm();
491#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
492 express::tls::in::Server(
"in-https", router, reportState, [](
auto& config) {
493 config.setPort(8088);
495 config.setDisableNagleAlgorithm();
500#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
501 express::legacy::in6::Server(
"in6-http", router, reportState, [](
auto& config) {
502 config.setPort(8080);
504 config.setDisableNagleAlgorithm();
506 config.setIPv6Only();
509#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
510 express::tls::in6::Server(
"in6-https", router, reportState, [](
auto& config) {
511 config.setPort(8088);
513 config.setDisableNagleAlgorithm();
515 config.setIPv6Only();
520#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
521 express::legacy::un::Server(
"un-http", router, reportState, [](
auto& config) {
522 config.setSunPath(
"/tmp/" + utils::Config::getApplicationName() +
"-" + config.getInstanceName());
525#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
526 express::tls::un::Server(
"un-https", router, reportState, [](
auto& config) {
527 config.setSunPath(
"/tmp/" + utils::Config::getApplicationName() +
"-" + config.getInstanceName());
532 return core::SNodeC::start();