MQTTSuite
Loading...
Searching...
No Matches
MqttModel.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the Free
8 * Software Foundation, either version 3 of the License, or (at your option)
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "MqttModel.h"
43
44#include "Mqtt.h"
45
46#ifndef DOXYGEN_SHOULD_SKIP_THIS
47
48// #include <cctype>
49// #include <core/socket/SocketAddress.h>
50
51#include <core/socket/stream/SocketConnection.h>
52#include <express/Response.h>
53#include <iot/mqtt/MqttContext.h>
54#include <iot/mqtt/server/broker/Broker.h>
55#include <net/SocketAddress.h>
56#include <nlohmann/json.hpp>
57#include <web/http/server/SocketContext.h>
58
59// IWYU pragma: no_include <nlohmann/detail/json_ref.hpp>
60
61#include <ctime>
62#include <functional>
63#include <iomanip>
64#include <log/Logger.h>
65#include <sstream>
66#include <utility>
67
68struct tm;
69
70#endif // DOXYGEN_SHOULD_SKIP_THIS
71
72namespace mqtt::mqttbroker::lib {
73
74 /*
75 {
76 "clientId": "sensor-01",
77 "protocol": "MQTT",
78 "since": "2025-12-25 10:30:00 UTC",
79 "duration": "2 days, 03:45:12",
80 "connectionName": "mqtt_connection_12345",
81 "localAddress": "127.0.0.1:1883",
82 "remoteAddress": "192.168.1.45:54321",
83 "cleanSession": true,
84 "connectFlags": 194,
85 "username": "sensor_user",
86 "usernameFlag": true,
87 "password": "secret123",
88 "passwordFlag": true,
89 "keepAlive": 60,
90 "protocolLevel": 4,
91 "loopPrevention": true,
92 "willMessage": "sensor-01 disconnected unexpectedly",
93 "willTopic": "sensors/status/sensor-01",
94 "willQoS": 1,
95 "willFlag": true,
96 "willRetain": true
97 }
98 */
99 void to_json(nlohmann::json& j, const MqttModel::MqttModelEntry& mqttModelEntry) {
100 const Mqtt* mqtt = mqttModelEntry.getMqtt();
101 const core::socket::stream::SocketConnection* socketConnection = mqtt->getMqttContext()->getSocketConnection();
102
103 j = {{"clientId", mqtt->getClientId()},
104 {"since", mqttModelEntry.onlineSince()},
105 {"duration", mqttModelEntry.onlineDuration()},
106 {"connectionName", mqtt->getConnectionName()},
107 {"cleanSession", mqtt->getCleanSession()},
108 {"connectFlags", mqtt->getConnectFlags()},
109 {"username", mqtt->getUsername()},
110 {"usernameFlag", mqtt->getUsernameFlag()},
111 {"password", mqtt->getPassword()},
112 {"passwordFlag", mqtt->getPasswordFlag()},
113 {"keepAlive", mqtt->getKeepAlive()},
114 {"protocol", mqtt->getProtocol()},
115 {"protocolLevel", mqtt->getLevel()},
116 {"loopPrevention", !mqtt->getReflect()},
117 {"willMessage", mqtt->getWillMessage()},
118 {"willTopic", mqtt->getWillTopic()},
119 {"willQoS", mqtt->getWillQoS()},
120 {"willFlag", mqtt->getWillFlag()},
121 {"willRetain", mqtt->getWillRetain()},
122 {"localAddress", socketConnection->getLocalAddress().toString()},
123 {"remoteAddress", socketConnection->getRemoteAddress().toString()}};
124 }
125
126 struct subscribe {
127 const std::string& topic;
128 const std::string& clientId;
129 uint8_t qoS;
130 };
131
132 void to_json(nlohmann::json& j, const subscribe& subscribe) {
133 j = {{"clientId", subscribe.clientId}, {"topic", subscribe.topic}, {"qos", subscribe.qoS}};
134 }
135
136 struct unsubscribe {
137 const std::string& clientId;
138 const std::string& topic;
139 };
140
141 void to_json(nlohmann::json& j, const unsubscribe& unsubscribe) {
142 j = {{"clientId", unsubscribe.clientId}, {"topic", unsubscribe.topic}};
143 }
144
145 struct retaine {
146 const std::string& topic;
147 const std::string& message;
148 uint8_t qoS;
149 };
150
151 void to_json(nlohmann::json& j, const retaine& retaine) {
152 j = {{"topic", retaine.topic}, {"message", retaine.message}, {"qos", retaine.qoS}};
153 }
154
155 struct release {
156 const std::string& topic;
157 };
158
159 void to_json(nlohmann::json& j, const release& release) {
160 j = {{"topic", release.topic}};
161 }
162
164 : mqtt(mqtt) {
165 }
166
169
170 std::string MqttModel::MqttModelEntry::onlineSince() const {
171 return mqtt->getMqttContext()->getSocketConnection()->getSocketContext()->getOnlineSince();
172 }
173
175 return mqtt->getMqttContext()->getSocketConnection()->getSocketContext()->getOnlineDuration();
176 }
177
179 return mqtt;
180 }
181
182 MqttModel::EventReceiver::EventReceiver(const std::shared_ptr<express::Response>& response)
183 : response(response)
184 , heartbeatTimer(core::timer::Timer::intervalTimer(
185 [response] {
186 response->sendFragment(":keep-alive");
187 response->sendFragment();
188 },
189 39)) {
190 }
191
195
197 return response.lock() == other.response.lock();
198 }
199
201 : onlineSinceTimePoint(std::chrono::system_clock::now()) {
202 }
203
205 static MqttModel mqttModel;
206
207 return mqttModel;
208 }
209
210 void MqttModel::addEventReceiver(const std::shared_ptr<express::Response>& response,
211 [[maybe_unused]] const std::string& lastEventId,
212 const std::shared_ptr<iot::mqtt::server::broker::Broker>& broker) {
213 auto& eventReceiver = eventReceiverList.emplace_back(response);
214
215 response->getSocketContext()->onDisconnected([this, &eventReceiver]() {
216 eventReceiverList.remove(eventReceiver);
217 });
218
219 /*
220 {
221 "title": "MQTTBroker",
222 "creator": {
223 "name": "Volker Christian",
224 "url": "https://github.com/VolkerChristian/"
225 },
226 "broker": {
227 "name": "MQTTBroker",
228 "url": "https://github.com/SNodeC/mqttsuite/tree/master/mqttbroker"
229 },
230 "suite": {
231 "name": "MQTTSuite",
232 "url": "https://github.com/SNodeC/mqttsuite"
233 },
234 "snodec": {
235 "name": "SNode.C",
236 "url": "https://github.com/SNodeC/snode.c"
237 },
238 "since": "2025-12-25 10:30:00 UTC",
239 "duration": "2 days, 03:45:12"
240 }
241 */
242 sendJsonEvent(response,
243 {
244 {"title", "MQTTBroker"},
245 {"creator", {{"name", "Volker Christian"}, {"url", "https://github.com/VolkerChristian"}}},
246 {"broker", {{"name", "MQTTBroker"}, {"url", "https://github.com/SNodeC/mqttsuite/tree/master/mqttbroker"}}},
247 {"suite", {{"name", "MQTTSuite"}, {"url", "https://github.com/SNodeC/mqttsuite"}}},
248 {"snodec", {{"name", "SNode.C"}, {"url", "https://github.com/SNodeC/snode.c"}}},
249 {"since", onlineSince()},
250 {"duration", onlineDuration()},
251 },
252 "ui-initialize",
253 std::to_string(id++));
254
255 for (const auto& modelMapEntry : modelMap) {
256 sendJsonEvent(response, modelMapEntry.second, "client-connected", std::to_string(id++));
257 }
258
259 for (const auto& [topic, clients] : broker->getSubscriptionTree()) {
260 for (const auto& client : clients) {
261 sendJsonEvent(response, subscribe{topic, client.first, client.second}, "client-subscribed", std::to_string(id++));
262 }
263 }
264
265 for (const auto& [topic, retained] : broker->getRetainTree()) {
266 sendJsonEvent(response, retaine{topic, retained.first, retained.second}, "retained-message-set", std::to_string(id++));
267 }
268 }
269
271 MqttModelEntry& mqttModelEntry = modelMap.emplace(mqtt->getClientId(), mqtt).first->second;
272
273 sendJsonEvent(mqttModelEntry, "client-connected", std::to_string(id++));
274 }
275
276 void MqttModel::disconnectClient(const std::string& clientId) {
277 if (modelMap.contains(clientId)) {
278 sendJsonEvent(modelMap[clientId], "client-disconnected", std::to_string(id++));
279
280 modelMap.erase(clientId);
281 }
282 }
283
284 void MqttModel::subscribeClient(const std::string& clientId, const std::string& topic, const uint8_t qos) {
285 sendJsonEvent(subscribe{topic, clientId, qos}, "client-subscribed", std::to_string(id++));
286 }
287
288 void MqttModel::unsubscribeClient(const std::string& clientId, const std::string& topic) {
289 sendJsonEvent(unsubscribe{clientId, topic}, "client-unsubscribed", std::to_string(id++));
290 }
291
292 void MqttModel::publishMessage(const std::string& topic, const std::string& message, uint8_t qoS, bool retain) {
293 if (retain) {
294 if (!message.empty()) {
295 sendJsonEvent(retaine{topic, message, qoS}, "retained-message-set", std::to_string(id++));
296 } else {
297 sendJsonEvent(release{topic}, "retained-message-deleted", std::to_string(id++));
298 }
299 }
300 }
301
302 const std::map<std::string, MqttModel::MqttModelEntry>& MqttModel::getClients() const {
303 return modelMap;
304 }
305
306 Mqtt* MqttModel::getMqtt(const std::string& clientId) const {
307 Mqtt* mqtt = nullptr;
308
309 auto modelIt = modelMap.find(clientId);
310 if (modelIt != modelMap.end()) {
311 mqtt = modelIt->second.getMqtt();
312 }
313
314 return mqtt;
315 }
316
317 std::string MqttModel::onlineSince() const {
318 return timePointToString(onlineSinceTimePoint);
319 }
320
321 std::string MqttModel::onlineDuration() const {
322 return durationToString(onlineSinceTimePoint);
323 }
324
325 void MqttModel::sendEvent(const std::shared_ptr<express::Response>& response,
326 const std::string& data,
327 const std::string& event,
328 const std::string& id) {
329 if (response->isConnected()) {
330 if (!event.empty()) {
331 response->sendFragment("event:" + event);
332 }
333 if (!id.empty()) {
334 response->sendFragment("id:" + id);
335 }
336 response->sendFragment("data:" + data);
337 response->sendFragment();
338 }
339 }
340
341 void MqttModel::sendJsonEvent(const std::shared_ptr<express::Response>& response,
342 const nlohmann::json& json,
343 const std::string& event,
344 const std::string& id) {
345 sendEvent(response, json.dump(), event, id);
346 }
347
348 void MqttModel::sendEvent(const std::string& data, const std::string& event, const std::string& id) const {
349 for (auto& eventReceiver : eventReceiverList) {
350 if (const auto& response = eventReceiver.response.lock()) {
351 sendEvent(response, data, event, id);
352 }
353 }
354 }
355
356 void MqttModel::sendJsonEvent(const nlohmann::json& json, const std::string& event, const std::string& id) const {
357 VLOG(0) << "Server sent event: " << event << "\n" << json.dump(4);
358
359 sendEvent(json.dump(), event, id);
360 }
361
362 std::string MqttModel::timePointToString(const std::chrono::time_point<std::chrono::system_clock>& timePoint) {
363 std::time_t time = std::chrono::system_clock::to_time_t(timePoint);
364 std::tm* tm_ptr = std::gmtime(&time);
365
366 char buffer[100];
367 std::string onlineSince = "Formatting error";
368
369 // Format: "2025-02-02 14:30:00"
370 if (std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", tm_ptr)) {
371 onlineSince = std::string(buffer) + " UTC";
372 }
373
374 return onlineSince;
375 }
376
377 std::string MqttModel::durationToString(const std::chrono::time_point<std::chrono::system_clock>& bevore,
378 const std::chrono::time_point<std::chrono::system_clock>& later) {
379 using seconds_duration_type = std::chrono::duration<std::chrono::seconds::rep>::rep;
380
381 seconds_duration_type totalSeconds = std::chrono::duration_cast<std::chrono::seconds>(later - bevore).count();
382
383 // Compute days, hours, minutes, and seconds
384 seconds_duration_type days = totalSeconds / 86400; // 86400 seconds in a day
385 seconds_duration_type remainder = totalSeconds % 86400;
386 seconds_duration_type hours = remainder / 3600;
387 remainder = remainder % 3600;
388 seconds_duration_type minutes = remainder / 60;
389 seconds_duration_type seconds = remainder % 60;
390
391 // Format the components into a string using stringstream
392 std::ostringstream oss;
393 if (days > 0) {
394 oss << days << " day" << (days == 1 ? "" : "s") << ", ";
395 }
396 oss << std::setw(2) << std::setfill('0') << hours << ":" << std::setw(2) << std::setfill('0') << minutes << ":" << std::setw(2)
397 << std::setfill('0') << seconds;
398
399 return oss.str();
400 }
401
402} // namespace mqtt::mqttbroker::lib
bool operator==(const EventReceiver &other)
std::weak_ptr< express::Response > response
Definition MqttModel.h:100
EventReceiver(const std::shared_ptr< express::Response > &response)
void sendEvent(const std::string &data, const std::string &event="", const std::string &id="") const
std::chrono::time_point< std::chrono::system_clock > onlineSinceTimePoint
Definition MqttModel.h:149
void publishMessage(const std::string &topic, const std::string &message, uint8_t qoS, bool retain)
void addEventReceiver(const std::shared_ptr< express::Response > &response, const std::string &lastEventId, const std::shared_ptr< iot::mqtt::server::broker::Broker > &broker)
static MqttModel & instance()
void unsubscribeClient(const std::string &clientId, const std::string &topic)
std::list< EventReceiver > eventReceiverList
Definition MqttModel.h:148
std::map< std::string, MqttModelEntry > modelMap
Definition MqttModel.h:147
Mqtt * getMqtt(const std::string &clientId) const
void subscribeClient(const std::string &clientId, const std::string &topic, const uint8_t qos)
void disconnectClient(const std::string &clientId)
static void sendEvent(const std::shared_ptr< express::Response > &response, const std::string &data, const std::string &event, const std::string &id)
const std::map< std::string, MqttModelEntry > & getClients() const
std::string onlineDuration() const
std::string onlineSince() const
void to_json(nlohmann::json &j, const retaine &retaine)
void to_json(nlohmann::json &j, const release &release)
void to_json(nlohmann::json &j, const unsubscribe &unsubscribe)
void to_json(nlohmann::json &j, const MqttModel::MqttModelEntry &mqttModelEntry)
Definition MqttModel.cpp:99
void to_json(nlohmann::json &j, const subscribe &subscribe)
const std::string & topic
const std::string & topic
const std::string & message