113 const std::string connectionName = res->getSocketContext()->getSocketConnection()->getConnectionName();
115 VLOG(2) << connectionName <<
" HTTP: Upgrade request:\n"
116 << httputils::toString(req->method,
118 "HTTP/" + std::to_string(req->httpMajor) +
"." + std::to_string(req->httpMinor),
123 std::vector<
char>());
125 if (req->get(
"sec-websocket-protocol").find(
"mqtt") != std::string::npos) {
126 res->upgrade(req, [req, res, connectionName](
const std::string& name) {
128 VLOG(1) << connectionName <<
": Successful upgrade:";
129 VLOG(1) << connectionName <<
": Selected: " << name;
130 VLOG(1) << connectionName <<
": Requested: " << req->get(
"sec-websocket-protocol");
134 VLOG(1) << connectionName <<
": Can not upgrade to any of '" << req->get(
"upgrade") <<
"'";
136 res->sendStatus(404);
140 VLOG(1) << connectionName <<
": Unsupported subprotocol(s):";
141 VLOG(1) <<
" Expected: mqtt";
142 VLOG(1) <<
" Requested: " << req->get(
"sec-websocket-protocol");
144 res->sendStatus(404);
149 const express::Router& jsonRouter =
express::middleware::JsonMiddleware();
152
153
154
155 jsonRouter.use(
"/api/mqtt", [] MIDDLEWARE(req, res, next) {
156 res->set({{
"Access-Control-Allow-Origin",
"*"},
157 {
"Access-Control-Allow-Headers",
"Content-Type"},
158 {
"Access-Control-Allow-Methods",
"GET, OPTIONS, POST"},
159 {
"Access-Control-Allow-Private-Network",
"true"}});
163 jsonRouter.post(
"/api/mqtt/disconnect", [] APPLICATION(req, res) {
164 VLOG(1) <<
"POST /disconnect";
166 req->getAttribute<nlohmann::json>(
167 [&res](nlohmann::json& json) {
168 std::string jsonString = json.dump(4);
170 VLOG(1) << jsonString;
172 std::string clientId = json[
"clientId"].get<std::string>();
173 const mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
175 if (mqtt !=
nullptr) {
176 mqtt->getMqttContext()->getSocketConnection()->close();
177 res->send(R"({"success": true, "message": "Client disconnected successfully"})"_json.dump());
179 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
182 [&res](
const std::string& key) {
183 VLOG(1) <<
"Attribute type not found: " << key;
185 res->status(400).send(
"Attribute type not found: " + key);
190
191
192
193 jsonRouter.post(
"/api/mqtt/unsubscribe", [] APPLICATION(req, res) {
194 VLOG(1) <<
"POST /unsubscribe";
196 req->getAttribute<nlohmann::json>(
197 [&res](nlohmann::json& json) {
198 std::string jsonString = json.dump(4);
200 VLOG(1) << jsonString;
202 std::string clientId = json[
"clientId"].get<std::string>();
203 std::string topic = json[
"topic"].get<std::string>();
205 mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
207 if (mqtt !=
nullptr) {
208 mqtt->unsubscribe(topic);
209 res->send(R"({"success": true, "message": "Client unsubscribed successfully"})"_json.dump());
211 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
214 [&res](
const std::string& key) {
215 VLOG(1) <<
"Attribute type not found: " << key;
217 res->status(400).send(
"Attribute type not found: " + key);
222
223
224
225 jsonRouter.post(
"/api/mqtt/release", [broker] APPLICATION(req, res) {
226 VLOG(1) <<
"POST /release";
228 req->getAttribute<nlohmann::json>(
229 [&res, broker](nlohmann::json& json) {
230 std::string jsonString = json.dump(4);
232 VLOG(1) << jsonString;
234 std::string topic = json[
"topic"].get<std::string>();
236 broker->publish(
"", topic,
"", 0,
true);
237 mqtt::mqttbroker::lib::MqttModel::instance().publishMessage(topic,
"", 0,
true);
239 res->send(R"({"success": true, "message": "Retained message released successfully"})"_json.dump());
241 [&res](
const std::string& key) {
242 VLOG(1) <<
"Attribute type not found: " << key;
244 res->status(400).send(
"Attribute type not found: " + key);
249
250
251
252 jsonRouter.post(
"/api/mqtt/subscribe", [] APPLICATION(req, res) {
253 VLOG(1) <<
"POST /subscribe";
255 req->getAttribute<nlohmann::json>(
256 [&res](nlohmann::json& json) {
257 std::string jsonString = json.dump(4);
259 VLOG(1) << jsonString;
261 std::string clientId = json[
"clientId"].get<std::string>();
262 std::string topic = json[
"topic"].get<std::string>();
263 uint8_t qoS = json[
"qos"].get<uint8_t>();
265 mqtt::mqttbroker::lib::Mqtt* mqtt = mqtt::mqttbroker::lib::MqttModel::instance().getMqtt(clientId);
267 if (mqtt !=
nullptr) {
268 mqtt->subscribe(topic, qoS);
270 res->send(R"({"success": true, "message": "Client subscribed successfully"})"_json.dump());
272 res->status(404).send(R"({"success": false, "error": "Client not found"})"_json.dump());
275 [&res](
const std::string& key) {
276 VLOG(1) <<
"Attribute type not found: " << key;
278 res->status(400).send(
"Attribute type not found: " + key);
283 router.use(jsonRouter);
285 router.get(
"/api/mqtt/events", [broker] APPLICATION(req, res) {
286 if (web::http::ciContains(req->get(
"Accept"),
"text/event-stream")) {
287 res->set({{
"Content-Type",
"text/event-stream"},
288 {
"Cache-Control",
"no-cache"},
289 {
"Connection",
"keep-alive"},
290 {
"Access-Control-Allow-Origin",
"*"}});
296 res->redirect(
"/clients");
300 router.get(
"/ws", [] APPLICATION(req, res) {
301 if (req->headers.contains(
"upgrade")) {
304 res->redirect(
"/clients");
308 router.get(
"/mqtt", [] APPLICATION(req, res) {
309 if (req->headers.contains(
"upgrade")) {
312 res->redirect(
"/clients");
316 router.get(
"/", [] APPLICATION(req, res) {
317 if (req->headers.contains(
"upgrade")) {
320 res->redirect(
"/clients");
324 router.get(
"/sse", [broker] APPLICATION(req, res) {
325 if (web::http::ciContains(req->get(
"Accept"),
"text/event-stream")) {
326 res->set(
"Content-Type",
"text/event-stream")
327 .set(
"Cache-Control",
"no-cache")
328 .set(
"Connection",
"keep-alive");
333 res->redirect(
"/clients");
337 router.get(
"/clients", [] APPLICATION(req, res) {
338 res->redirect(
"/clients/index.html");
341 router.use(
"/clients",
express::middleware::StaticMiddleware(webRoot));
343 router.get(
"*", [] APPLICATION(req, res) {
344 res->redirect(
"/clients/index.html");
351reportState(
const std::string& instanceName,
const core::socket::SocketAddress& socketAddress,
const core::socket::State& state) {
353 case core::socket::State::OK:
354 VLOG(1) << instanceName <<
": listening on '" << socketAddress.toString() <<
"'";
356 case core::socket::State::DISABLED:
357 VLOG(1) << instanceName <<
": disabled";
359 case core::socket::State::ERROR:
360 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
362 case core::socket::State::FATAL:
363 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
368int main(
int argc,
char* argv[]) {
369 utils::Config::configRoot.newSubCommand<mqtt::
lib::
ConfigMqttBroker>()->setHtmlRoot(std::string(CMAKE_INSTALL_PREFIX) +
370 "/var/www/mqttsuite/mqttbroker");
372 core::SNodeC::init(argc, argv);
374 std::shared_ptr<iot::
mqtt::server::
broker::Broker> broker = iot::
mqtt::server::
broker::Broker::instance(
375 SUBSCRIPTION_MAX_QOS, utils::Config::configRoot.getSubCommand<mqtt::
lib::
ConfigMqttBroker>()->getSessionStore());
377#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
378 net::in::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
380 [](net::in::stream::legacy::config::ConfigSocketServer* config) {
381 config->setPort(1883);
383 config->setDisableNagleAlgorithm();
386 .listen([](
const auto& socketAddress, core::socket::State state) {
387 reportState(
"in-mqtt", socketAddress, state);
390#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
391 net::in::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
393 [](net::in::stream::tls::config::ConfigSocketServer* config) {
394 config->setPort(8883);
396 config->setDisableNagleAlgorithm();
399 .listen([](
const auto& socketAddress, core::socket::State state) {
400 reportState(
"in-mqtts", socketAddress, state);
405#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
406 net::in6::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
408 [](net::in6::stream::legacy::config::ConfigSocketServer* config) {
409 config->setPort(1883);
411 config->setDisableNagleAlgorithm();
413 config->setIPv6Only();
416 .listen([](
const auto& socketAddress, core::socket::State state) {
417 reportState(
"in6-mqtt", socketAddress, state);
420#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
421 net::in6::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
423 [](net::in6::stream::tls::config::ConfigSocketServer* config) {
424 config->setPort(8883);
426 config->setDisableNagleAlgorithm();
428 config->setIPv6Only();
431 .listen([](
const auto& socketAddress, core::socket::State state) {
432 reportState(
"in6-mqtts", socketAddress, state);
437#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
438 net::un::stream::legacy::Server<mqtt::mqttbroker::SocketContextFactory>(
440 [](net::un::stream::legacy::config::ConfigSocketServer* config) {
441 config->setSunPath(
"/tmp/" + utils::Config::getApplicationName() +
"-" + config->getInstanceName());
445 .listen([](
const auto& socketAddress, core::socket::State state) {
446 reportState(
"un-mqtt", socketAddress, state);
449#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
450 net::un::stream::tls::Server<mqtt::mqttbroker::SocketContextFactory>(
452 [](net::un::stream::tls::config::ConfigSocketServer* config) {
453 config->setSunPath(
"/tmp/" + utils::Config::getApplicationName() +
"-" + config->getInstanceName());
457 .listen([](
const auto& socketAddress, core::socket::State state) {
458 reportState(
"un-mqtts", socketAddress, state);
462 express::Router router = getRouter(broker, utils::Config::configRoot.getSubCommand<mqtt::
lib::
ConfigMqttBroker>()->getHtmlRoot());
464#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV4
465 express::legacy::in::Server(
469 [](net::in::stream::legacy::config::ConfigSocketServer* config) {
470 config->setPort(8080);
472 config->setDisableNagleAlgorithm();
475#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV4
476 express::tls::in::Server(
480 [](net::in::stream::tls::config::ConfigSocketServer* config) {
481 config->setPort(8088);
483 config->setDisableNagleAlgorithm();
488#ifdef CONFIG_MQTTSUITE_BROKER_TCP_IPV6
489 express::legacy::in6::Server(
493 [](net::in6::stream::legacy::config::ConfigSocketServer* config) {
494 config->setPort(8080);
496 config->setDisableNagleAlgorithm();
498 config->setIPv6Only();
501#ifdef CONFIG_MQTTSUITE_BROKER_TLS_IPV6
502 express::tls::in6::Server(
506 [](net::in6::stream::tls::config::ConfigSocketServer* config) {
507 config->setPort(8088);
509 config->setDisableNagleAlgorithm();
511 config->setIPv6Only();
516#ifdef CONFIG_MQTTSUITE_BROKER_UNIX
517 express::legacy::un::Server(
521 [](net::un::stream::legacy::config::ConfigSocketServer* config) {
522 config->setSunPath(
"/tmp/" + utils::Config::getApplicationName() +
"-" + config->getInstanceName());
525#ifdef CONFIG_MQTTSUITE_BROKER_UNIX_TLS
526 express::tls::un::Server(
530 [](net::un::stream::tls::config::ConfigSocketServer* config) {
531 config->setSunPath(
"/tmp/" + utils::Config::getApplicationName() +
"-" + config->getInstanceName());
536 return core::SNodeC::start();