MQTTSuite
Loading...
Searching...
No Matches
MqttModel.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025, 2026
5 *
6 * This program is free software: you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the Free
8 * Software Foundation, either version 3 of the License, or (at your option)
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "MqttModel.h"
43
44#include "Mqtt.h"
45
46#include <core/socket/stream/SocketConnection.h>
47#include <express/Response.h>
48#include <iot/mqtt/MqttContext.h>
49#include <iot/mqtt/server/broker/Broker.h>
50#include <net/SocketAddress.h>
51#include <nlohmann/json.hpp>
52#include <web/http/server/SocketContext.h>
53
54#ifndef DOXYGEN_SHOULD_SKIP_THIS
55
56// IWYU pragma: no_include <nlohmann/detail/json_ref.hpp>
57
58#include <cstdint>
59#include <ctime>
60#include <functional>
61#include <iomanip>
62#include <log/Logger.h>
63#include <sstream>
64#include <utility>
65
66struct tm;
67
68#endif // DOXYGEN_SHOULD_SKIP_THIS
69
70namespace mqtt::mqttbroker::lib {
71
72 /*
73 {
74 "clientId": "sensor-01",
75 "protocol": "MQTT",
76 "since": "2025-12-25 10:30:00 UTC",
77 "duration": "2 days, 03:45:12",
78 "connectionName": "mqtt_connection_12345",
79 "localAddress": "127.0.0.1:1883",
80 "remoteAddress": "192.168.1.45:54321",
81 "cleanSession": true,
82 "connectFlags": 194,
83 "username": "sensor_user",
84 "usernameFlag": true,
85 "password": "secret123",
86 "passwordFlag": true,
87 "keepAlive": 60,
88 "protocolLevel": 4,
89 "loopPrevention": true,
90 "willMessage": "sensor-01 disconnected unexpectedly",
91 "willTopic": "sensors/status/sensor-01",
92 "willQoS": 1,
93 "willFlag": true,
94 "willRetain": true
95 }
96 */
97 static void to_json(nlohmann::json& j, const Mqtt* mqtt) {
98 j = {{"clientId", mqtt->getClientId()},
99 {"connectionName", mqtt->getConnectionName()},
100 {"cleanSession", mqtt->getCleanSession()},
101 {"connectFlags", mqtt->getConnectFlags()},
102 {"username", mqtt->getUsername()},
103 {"usernameFlag", mqtt->getUsernameFlag()},
104 {"password", mqtt->getPassword()},
105 {"passwordFlag", mqtt->getPasswordFlag()},
106 {"keepAlive", mqtt->getKeepAlive()},
107 {"protocol", mqtt->getProtocol()},
108 {"protocolLevel", mqtt->getLevel()},
109 {"loopPrevention", !mqtt->getReflect()},
110 {"willMessage", mqtt->getWillMessage()},
111 {"willTopic", mqtt->getWillTopic()},
112 {"willQoS", mqtt->getWillQoS()},
113 {"willFlag", mqtt->getWillFlag()},
114 {"willRetain", mqtt->getWillRetain()},
115 {"since", mqtt->getMqttContext()->getSocketConnection()->getSocketContext()->getOnlineSince()},
116 {"duration", mqtt->getMqttContext()->getSocketConnection()->getSocketContext()->getOnlineDuration()},
117 {"localAddress", mqtt->getMqttContext()->getSocketConnection()->getLocalAddress().toString()},
118 {"remoteAddress", mqtt->getMqttContext()->getSocketConnection()->getRemoteAddress().toString()}};
119 }
120
121 struct subscribe {
122 const std::string& topic;
123 const std::string& clientId;
124 uint8_t qoS;
125 };
126
127 static void to_json(nlohmann::json& j, const subscribe& subscribe) {
128 j = {{"clientId", subscribe.clientId}, {"topic", subscribe.topic}, {"qos", subscribe.qoS}};
129 }
130
131 struct unsubscribe {
132 const std::string& clientId;
133 const std::string& topic;
134 };
135
136 static void to_json(nlohmann::json& j, const unsubscribe& unsubscribe) {
137 j = {{"clientId", unsubscribe.clientId}, {"topic", unsubscribe.topic}};
138 }
139
140 struct retaine {
141 const std::string& topic;
142 const std::string& message;
143 uint8_t qoS;
144 };
145
146 static void to_json(nlohmann::json& j, const retaine& retaine) {
147 j = {{"topic", retaine.topic}, {"message", retaine.message}, {"qos", retaine.qoS}};
148 }
149
150 struct release {
151 const std::string& topic;
152 };
153
154 static void to_json(nlohmann::json& j, const release& release) {
155 j = {{"topic", release.topic}};
156 }
157
158 MqttModel::EventReceiver::EventReceiver(const std::shared_ptr<express::Response>& response)
159 : response(response)
160 , heartbeatTimer(core::timer::Timer::intervalTimer(
161 [response] {
162 response->sendFragment(":keep-alive");
163 response->sendFragment();
164 },
165 39)) {
166 }
167
171
173 return response.lock() == other.response.lock();
174 }
175
177 : onlineSinceTimePoint(std::chrono::system_clock::now()) {
178 }
179
181 static MqttModel mqttModel;
182
183 return mqttModel;
184 }
185
186 void MqttModel::addEventReceiver(const std::shared_ptr<express::Response>& response,
187 [[maybe_unused]] const std::string& lastEventId,
188 const std::shared_ptr<iot::mqtt::server::broker::Broker>& broker) {
189 auto& eventReceiver = eventReceiverList.emplace_back(response);
190
191 response->getSocketContext()->onDisconnected([this, &eventReceiver]() {
192 eventReceiverList.remove(eventReceiver);
193 });
194
195 /*
196 {
197 "title": "MQTTBroker",
198 "creator": {
199 "name": "Volker Christian",
200 "url": "https://github.com/VolkerChristian/"
201 },
202 "broker": {
203 "name": "MQTTBroker",
204 "url": "https://github.com/SNodeC/mqttsuite/tree/master/mqttbroker"
205 },
206 "suite": {
207 "name": "MQTTSuite",
208 "url": "https://github.com/SNodeC/mqttsuite"
209 },
210 "snodec": {
211 "name": "SNode.C",
212 "url": "https://github.com/SNodeC/snode.c"
213 },
214 "since": "2025-12-25 10:30:00 UTC",
215 "duration": "2 days, 03:45:12"
216 }
217 */
218 sendJsonEvent(response,
219 {
220 {"title", "MQTTBroker"},
221 {"creator", {{"name", "Volker Christian"}, {"url", "https://github.com/VolkerChristian"}}},
222 {"broker", {{"name", "MQTTBroker"}, {"url", "https://github.com/SNodeC/mqttsuite/tree/master/mqttbroker"}}},
223 {"suite", {{"name", "MQTTSuite"}, {"url", "https://github.com/SNodeC/mqttsuite"}}},
224 {"snodec", {{"name", "SNode.C"}, {"url", "https://github.com/SNodeC/snode.c"}}},
225 {"since", onlineSince()},
226 {"duration", onlineDuration()},
227 },
228 "ui-initialize",
229 std::to_string(id++));
230
231 for (const auto& modelMapEntry : modelMap) {
232 sendJsonEvent(response, modelMapEntry.second, "client-connected", std::to_string(id++));
233 }
234
235 for (const auto& [topic, clients] : broker->getSubscriptionTree()) {
236 for (const auto& client : clients) {
237 sendJsonEvent(response, subscribe{topic, client.first, client.second}, "client-subscribed", std::to_string(id++));
238 }
239 }
240
241 for (const auto& [topic, retained] : broker->getRetainTree()) {
242 sendJsonEvent(response, retaine{topic, retained.first, retained.second}, "retained-message-set", std::to_string(id++));
243 }
244 }
245
247 modelMap.emplace(mqtt->getClientId(), mqtt);
248
249 sendJsonEvent(mqtt, "client-connected", std::to_string(id++));
250 }
251
252 void MqttModel::disconnectClient(const std::string& clientId) {
253 if (modelMap.contains(clientId)) {
254 sendJsonEvent(modelMap[clientId], "client-disconnected", std::to_string(id++));
255
256 modelMap.erase(clientId);
257 }
258 }
259
260 void MqttModel::subscribeClient(const std::string& clientId, const std::string& topic, const uint8_t qos) {
261 sendJsonEvent(subscribe{topic, clientId, qos}, "client-subscribed", std::to_string(id++));
262 }
263
264 void MqttModel::unsubscribeClient(const std::string& clientId, const std::string& topic) {
265 sendJsonEvent(unsubscribe{clientId, topic}, "client-unsubscribed", std::to_string(id++));
266 }
267
268 void MqttModel::publishMessage(const std::string& topic, const std::string& message, uint8_t qoS, bool retain) {
269 if (retain) {
270 if (!message.empty()) {
271 sendJsonEvent(retaine{topic, message, qoS}, "retained-message-set", std::to_string(id++));
272 } else {
273 sendJsonEvent(release{topic}, "retained-message-deleted", std::to_string(id++));
274 }
275 }
276 }
277
278 const std::map<std::string, Mqtt*>& MqttModel::getClients() const {
279 return modelMap;
280 }
281
282 Mqtt* MqttModel::getMqtt(const std::string& clientId) const {
283 Mqtt* mqtt = nullptr;
284
285 auto modelIt = modelMap.find(clientId);
286 if (modelIt != modelMap.end()) {
287 mqtt = modelIt->second;
288 }
289
290 return mqtt;
291 }
292
293 std::string MqttModel::onlineSince() const {
294 return timePointToString(onlineSinceTimePoint);
295 }
296
297 std::string MqttModel::onlineDuration() const {
298 return durationToString(onlineSinceTimePoint);
299 }
300
301 void MqttModel::sendEvent(const std::shared_ptr<express::Response>& response,
302 const std::string& data,
303 const std::string& event,
304 const std::string& id) {
305 if (response->isConnected()) {
306 if (!event.empty()) {
307 response->sendFragment("event:" + event);
308 }
309 if (!id.empty()) {
310 response->sendFragment("id:" + id);
311 }
312 response->sendFragment("data:" + data);
313 response->sendFragment();
314 }
315 }
316
317 void MqttModel::sendJsonEvent(const std::shared_ptr<express::Response>& response,
318 const nlohmann::json& json,
319 const std::string& event,
320 const std::string& id) {
321 sendEvent(response, json.dump(), event, id);
322 }
323
324 void MqttModel::sendEvent(const std::string& data, const std::string& event, const std::string& id) const {
325 for (auto& eventReceiver : eventReceiverList) {
326 if (const auto& response = eventReceiver.response.lock()) {
327 sendEvent(response, data, event, id);
328 }
329 }
330 }
331
332 void MqttModel::sendJsonEvent(const nlohmann::json& json, const std::string& event, const std::string& id) const {
333 VLOG(0) << "Server sent event: " << event << "\n" << json.dump(4);
334
335 sendEvent(json.dump(), event, id);
336 }
337
338 std::string MqttModel::timePointToString(const std::chrono::time_point<std::chrono::system_clock>& timePoint) {
339 std::time_t time = std::chrono::system_clock::to_time_t(timePoint);
340 std::tm* tm_ptr = std::gmtime(&time);
341
342 char buffer[100];
343 std::string onlineSince = "Formatting error";
344
345 // Format: "2025-02-02 14:30:00"
346 if (std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", tm_ptr)) {
347 onlineSince = std::string(buffer) + " UTC";
348 }
349
350 return onlineSince;
351 }
352
353 std::string MqttModel::durationToString(const std::chrono::time_point<std::chrono::system_clock>& bevore,
354 const std::chrono::time_point<std::chrono::system_clock>& later) {
355 using seconds_duration_type = std::chrono::duration<std::chrono::seconds::rep>::rep;
356
357 seconds_duration_type totalSeconds = std::chrono::duration_cast<std::chrono::seconds>(later - bevore).count();
358
359 // Compute days, hours, minutes, and seconds
360 seconds_duration_type days = totalSeconds / 86400; // 86400 seconds in a day
361 seconds_duration_type remainder = totalSeconds % 86400;
362 seconds_duration_type hours = remainder / 3600;
363 remainder = remainder % 3600;
364 seconds_duration_type minutes = remainder / 60;
365 seconds_duration_type seconds = remainder % 60;
366
367 // Format the components into a string using stringstream
368 std::ostringstream oss;
369 if (days > 0) {
370 oss << days << " day" << (days == 1 ? "" : "s") << ", ";
371 }
372 oss << std::setw(2) << std::setfill('0') << hours << ":" << std::setw(2) << std::setfill('0') << minutes << ":" << std::setw(2)
373 << std::setfill('0') << seconds;
374
375 return oss.str();
376 }
377
378} // namespace mqtt::mqttbroker::lib
bool operator==(const EventReceiver &other)
std::weak_ptr< express::Response > response
Definition MqttModel.h:83
EventReceiver(const std::shared_ptr< express::Response > &response)
void sendEvent(const std::string &data, const std::string &event="", const std::string &id="") const
std::chrono::time_point< std::chrono::system_clock > onlineSinceTimePoint
Definition MqttModel.h:132
void publishMessage(const std::string &topic, const std::string &message, uint8_t qoS, bool retain)
std::map< std::string, Mqtt * > modelMap
Definition MqttModel.h:130
void addEventReceiver(const std::shared_ptr< express::Response > &response, const std::string &lastEventId, const std::shared_ptr< iot::mqtt::server::broker::Broker > &broker)
static MqttModel & instance()
void unsubscribeClient(const std::string &clientId, const std::string &topic)
std::list< EventReceiver > eventReceiverList
Definition MqttModel.h:131
const std::map< std::string, Mqtt * > & getClients() const
Mqtt * getMqtt(const std::string &clientId) const
void subscribeClient(const std::string &clientId, const std::string &topic, const uint8_t qos)
void disconnectClient(const std::string &clientId)
static void sendEvent(const std::shared_ptr< express::Response > &response, const std::string &data, const std::string &event, const std::string &id)
std::string onlineDuration() const
std::string onlineSince() const
static void to_json(nlohmann::json &j, const Mqtt *mqtt)
Definition MqttModel.cpp:97
static void to_json(nlohmann::json &j, const unsubscribe &unsubscribe)
static void to_json(nlohmann::json &j, const retaine &retaine)
static void to_json(nlohmann::json &j, const subscribe &subscribe)
static void to_json(nlohmann::json &j, const release &release)
const std::string & topic
const std::string & topic
const std::string & message