2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
46#ifndef DOXYGEN_SHOULD_SKIP_THIS
51#include <core/socket/stream/SocketConnection.h>
52#include <express/Response.h>
53#include <iot/mqtt/MqttContext.h>
54#include <iot/mqtt/server/broker/Broker.h>
55#include <net/SocketAddress.h>
56#include <nlohmann/json.hpp>
57#include <web/http/server/SocketContext.h>
64#include <log/Logger.h>
72namespace mqtt::mqttbroker::
lib {
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
101 const core::socket::stream::SocketConnection* socketConnection = mqtt->getMqttContext()->getSocketConnection();
103 j = {{
"clientId", mqtt->getClientId()},
106 {
"connectionName", mqtt->getConnectionName()},
107 {
"cleanSession", mqtt->getCleanSession()},
108 {
"connectFlags", mqtt->getConnectFlags()},
109 {
"username", mqtt->getUsername()},
110 {
"usernameFlag", mqtt->getUsernameFlag()},
111 {
"password", mqtt->getPassword()},
112 {
"passwordFlag", mqtt->getPasswordFlag()},
113 {
"keepAlive", mqtt->getKeepAlive()},
114 {
"protocol", mqtt->getProtocol()},
115 {
"protocolLevel", mqtt->getLevel()},
116 {
"loopPrevention", !mqtt->getReflect()},
117 {
"willMessage", mqtt->getWillMessage()},
118 {
"willTopic", mqtt->getWillTopic()},
119 {
"willQoS", mqtt->getWillQoS()},
120 {
"willFlag", mqtt->getWillFlag()},
121 {
"willRetain", mqtt->getWillRetain()},
122 {
"localAddress", socketConnection->getLocalAddress().toString()},
123 {
"remoteAddress", socketConnection->getRemoteAddress().toString()}};
133 j = {{
"clientId", subscribe
.clientId}, {
"topic", subscribe
.topic}, {
"qos", subscribe
.qoS}};
142 j = {{
"clientId", unsubscribe
.clientId}, {
"topic", unsubscribe
.topic}};
160 j = {{
"topic", release
.topic}};
171 return mqtt->getMqttContext()->getSocketConnection()->getSocketContext()->getOnlineSince();
175 return mqtt->getMqttContext()->getSocketConnection()->getSocketContext()->getOnlineDuration();
186 response->sendFragment(
":keep-alive");
187 response->sendFragment();
212 const std::shared_ptr<iot::
mqtt::server::
broker::Broker>& broker) {
215 response->getSocketContext()->onDisconnected([
this, &eventReceiver]() {
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242 sendJsonEvent(response,
244 {
"title",
"MQTTBroker"},
245 {
"creator", {{
"name",
"Volker Christian"}, {
"url",
"https://github.com/VolkerChristian"}}},
246 {
"broker", {{
"name",
"MQTTBroker"}, {
"url",
"https://github.com/SNodeC/mqttsuite/tree/master/mqttbroker"}}},
247 {
"suite", {{
"name",
"MQTTSuite"}, {
"url",
"https://github.com/SNodeC/mqttsuite"}}},
248 {
"snodec", {{
"name",
"SNode.C"}, {
"url",
"https://github.com/SNodeC/snode.c"}}},
253 std::to_string(
id++));
255 for (
const auto& modelMapEntry :
modelMap) {
256 sendJsonEvent(response, modelMapEntry.second,
"client-connected", std::to_string(
id++));
259 for (
const auto& [topic, clients] : broker->getSubscriptionTree()) {
260 for (
const auto& client : clients) {
261 sendJsonEvent(response,
subscribe{topic, client.first, client.second},
"client-subscribed", std::to_string(
id++));
265 for (
const auto& [topic, retained] : broker->getRetainTree()) {
266 sendJsonEvent(response,
retaine{topic, retained.first, retained.second},
"retained-message-set", std::to_string(
id++));
273 sendJsonEvent(mqttModelEntry,
"client-connected", std::to_string(
id++));
278 sendJsonEvent(
modelMap[clientId],
"client-disconnected", std::to_string(
id++));
285 sendJsonEvent(
subscribe{topic, clientId, qos},
"client-subscribed", std::to_string(
id++));
289 sendJsonEvent(
unsubscribe{clientId, topic},
"client-unsubscribed", std::to_string(
id++));
294 if (!message.empty()) {
295 sendJsonEvent(
retaine{topic, message, qoS},
"retained-message-set", std::to_string(
id++));
297 sendJsonEvent(
release{topic},
"retained-message-deleted", std::to_string(
id++));
307 Mqtt* mqtt =
nullptr;
309 auto modelIt =
modelMap.find(clientId);
326 const std::string& data,
327 const std::string& event,
328 const std::string& id) {
329 if (response->isConnected()) {
330 if (!event.empty()) {
331 response->sendFragment(
"event:" + event);
334 response->sendFragment(
"id:" + id);
336 response->sendFragment(
"data:" + data);
337 response->sendFragment();
343 const std::string& event,
344 const std::string& id) {
348 void MqttModel::
sendEvent(
const std::string& data,
const std::string& event,
const std::string& id)
const {
350 if (
const auto& response = eventReceiver
.response.lock()) {
356 void MqttModel::sendJsonEvent(
const nlohmann::json& json,
const std::string& event,
const std::string& id)
const {
357 VLOG(0) <<
"Server sent event: " << event <<
"\n" << json.dump(4);
362 std::string
MqttModel::timePointToString(
const std::chrono::time_point<std::chrono::system_clock>& timePoint) {
363 std::time_t time = std::chrono::system_clock::to_time_t(timePoint);
364 std::tm* tm_ptr = std::gmtime(&time);
367 std::string onlineSince =
"Formatting error";
370 if (std::strftime(buffer,
sizeof(buffer),
"%Y-%m-%d %H:%M:%S", tm_ptr)) {
371 onlineSince = std::string(buffer) +
" UTC";
377 std::string
MqttModel::durationToString(
const std::chrono::time_point<std::chrono::system_clock>& bevore,
378 const std::chrono::time_point<std::chrono::system_clock>& later) {
379 using seconds_duration_type = std::chrono::duration<std::chrono::seconds::rep>::rep;
381 seconds_duration_type totalSeconds = std::chrono::duration_cast<std::chrono::seconds>(later - bevore).count();
384 seconds_duration_type days = totalSeconds / 86400;
385 seconds_duration_type remainder = totalSeconds % 86400;
386 seconds_duration_type hours = remainder / 3600;
387 remainder = remainder % 3600;
388 seconds_duration_type minutes = remainder / 60;
389 seconds_duration_type seconds = remainder % 60;
392 std::ostringstream oss;
394 oss << days <<
" day" << (days == 1 ?
"" :
"s") <<
", ";
396 oss << std::setw(2) << std::setfill(
'0') << hours <<
":" << std::setw(2) << std::setfill(
'0') << minutes <<
":" << std::setw(2)
397 << std::setfill(
'0') << seconds;
bool operator==(const EventReceiver &other)
std::weak_ptr< express::Response > response
EventReceiver(const std::shared_ptr< express::Response > &response)
core::timer::Timer heartbeatTimer
MqttModelEntry(Mqtt *mqtt)
std::string onlineSince() const
std::string onlineDuration() const
void sendEvent(const std::string &data, const std::string &event="", const std::string &id="") const
void connectClient(Mqtt *mqtt)
std::chrono::time_point< std::chrono::system_clock > onlineSinceTimePoint
void publishMessage(const std::string &topic, const std::string &message, uint8_t qoS, bool retain)
void addEventReceiver(const std::shared_ptr< express::Response > &response, const std::string &lastEventId, const std::shared_ptr< iot::mqtt::server::broker::Broker > &broker)
static MqttModel & instance()
void unsubscribeClient(const std::string &clientId, const std::string &topic)
std::list< EventReceiver > eventReceiverList
std::map< std::string, MqttModelEntry > modelMap
Mqtt * getMqtt(const std::string &clientId) const
void subscribeClient(const std::string &clientId, const std::string &topic, const uint8_t qos)
void disconnectClient(const std::string &clientId)
static void sendEvent(const std::shared_ptr< express::Response > &response, const std::string &data, const std::string &event, const std::string &id)
const std::map< std::string, MqttModelEntry > & getClients() const
std::string onlineDuration() const
std::string onlineSince() const
void to_json(nlohmann::json &j, const retaine &retaine)
void to_json(nlohmann::json &j, const release &release)
void to_json(nlohmann::json &j, const unsubscribe &unsubscribe)
void to_json(nlohmann::json &j, const MqttModel::MqttModelEntry &mqttModelEntry)
void to_json(nlohmann::json &j, const subscribe &subscribe)
const std::string & topic
const std::string & topic
const std::string & message
const std::string & clientId
const std::string & topic
const std::string & clientId
const std::string & topic