2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
46#include <core/socket/stream/SocketConnection.h>
47#include <express/Response.h>
48#include <iot/mqtt/MqttContext.h>
49#include <iot/mqtt/server/broker/Broker.h>
50#include <net/SocketAddress.h>
51#include <nlohmann/json.hpp>
52#include <web/http/server/SocketContext.h>
54#ifndef DOXYGEN_SHOULD_SKIP_THIS
62#include <log/Logger.h>
70namespace mqtt::mqttbroker::
lib {
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
98 j = {{
"clientId", mqtt->getClientId()},
99 {
"connectionName", mqtt->getConnectionName()},
100 {
"cleanSession", mqtt->getCleanSession()},
101 {
"connectFlags", mqtt->getConnectFlags()},
102 {
"username", mqtt->getUsername()},
103 {
"usernameFlag", mqtt->getUsernameFlag()},
104 {
"password", mqtt->getPassword()},
105 {
"passwordFlag", mqtt->getPasswordFlag()},
106 {
"keepAlive", mqtt->getKeepAlive()},
107 {
"protocol", mqtt->getProtocol()},
108 {
"protocolLevel", mqtt->getLevel()},
109 {
"loopPrevention", !mqtt->getReflect()},
110 {
"willMessage", mqtt->getWillMessage()},
111 {
"willTopic", mqtt->getWillTopic()},
112 {
"willQoS", mqtt->getWillQoS()},
113 {
"willFlag", mqtt->getWillFlag()},
114 {
"willRetain", mqtt->getWillRetain()},
115 {
"since", mqtt->getMqttContext()->getSocketConnection()->getSocketContext()->getOnlineSince()},
116 {
"duration", mqtt->getMqttContext()->getSocketConnection()->getSocketContext()->getOnlineDuration()},
117 {
"localAddress", mqtt->getMqttContext()->getSocketConnection()->getLocalAddress().toString()},
118 {
"remoteAddress", mqtt->getMqttContext()->getSocketConnection()->getRemoteAddress().toString()}};
128 j = {{
"clientId", subscribe
.clientId}, {
"topic", subscribe
.topic}, {
"qos", subscribe
.qoS}};
137 j = {{
"clientId", unsubscribe
.clientId}, {
"topic", unsubscribe
.topic}};
155 j = {{
"topic", release
.topic}};
162 response->sendFragment(
":keep-alive");
163 response->sendFragment();
188 const std::shared_ptr<iot::
mqtt::server::
broker::Broker>& broker) {
191 response->getSocketContext()->onDisconnected([
this, &eventReceiver]() {
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218 sendJsonEvent(response,
220 {
"title",
"MQTTBroker"},
221 {
"creator", {{
"name",
"Volker Christian"}, {
"url",
"https://github.com/VolkerChristian"}}},
222 {
"broker", {{
"name",
"MQTTBroker"}, {
"url",
"https://github.com/SNodeC/mqttsuite/tree/master/mqttbroker"}}},
223 {
"suite", {{
"name",
"MQTTSuite"}, {
"url",
"https://github.com/SNodeC/mqttsuite"}}},
224 {
"snodec", {{
"name",
"SNode.C"}, {
"url",
"https://github.com/SNodeC/snode.c"}}},
229 std::to_string(
id++));
231 for (
const auto& modelMapEntry :
modelMap) {
232 sendJsonEvent(response, modelMapEntry.second,
"client-connected", std::to_string(
id++));
235 for (
const auto& [topic, clients] : broker->getSubscriptionTree()) {
236 for (
const auto& client : clients) {
237 sendJsonEvent(response,
subscribe{topic, client.first, client.second},
"client-subscribed", std::to_string(
id++));
241 for (
const auto& [topic, retained] : broker->getRetainTree()) {
242 sendJsonEvent(response,
retaine{topic, retained.first, retained.second},
"retained-message-set", std::to_string(
id++));
247 modelMap.emplace(mqtt->getClientId(), mqtt);
249 sendJsonEvent(mqtt,
"client-connected", std::to_string(
id++));
254 sendJsonEvent(
modelMap[clientId],
"client-disconnected", std::to_string(
id++));
261 sendJsonEvent(
subscribe{topic, clientId, qos},
"client-subscribed", std::to_string(
id++));
265 sendJsonEvent(
unsubscribe{clientId, topic},
"client-unsubscribed", std::to_string(
id++));
270 if (!message.empty()) {
271 sendJsonEvent(
retaine{topic, message, qoS},
"retained-message-set", std::to_string(
id++));
273 sendJsonEvent(
release{topic},
"retained-message-deleted", std::to_string(
id++));
283 Mqtt* mqtt =
nullptr;
285 auto modelIt =
modelMap.find(clientId);
287 mqtt = modelIt->second;
302 const std::string& data,
303 const std::string& event,
304 const std::string& id) {
305 if (response->isConnected()) {
306 if (!event.empty()) {
307 response->sendFragment(
"event:" + event);
310 response->sendFragment(
"id:" + id);
312 response->sendFragment(
"data:" + data);
313 response->sendFragment();
319 const std::string& event,
320 const std::string& id) {
324 void MqttModel::
sendEvent(
const std::string& data,
const std::string& event,
const std::string& id)
const {
326 if (
const auto& response = eventReceiver
.response.lock()) {
332 void MqttModel::sendJsonEvent(
const nlohmann::json& json,
const std::string& event,
const std::string& id)
const {
333 VLOG(0) <<
"Server sent event: " << event <<
"\n" << json.dump(4);
338 std::string
MqttModel::timePointToString(
const std::chrono::time_point<std::chrono::system_clock>& timePoint) {
339 std::time_t time = std::chrono::system_clock::to_time_t(timePoint);
340 std::tm* tm_ptr = std::gmtime(&time);
343 std::string onlineSince =
"Formatting error";
346 if (std::strftime(buffer,
sizeof(buffer),
"%Y-%m-%d %H:%M:%S", tm_ptr)) {
347 onlineSince = std::string(buffer) +
" UTC";
353 std::string
MqttModel::durationToString(
const std::chrono::time_point<std::chrono::system_clock>& bevore,
354 const std::chrono::time_point<std::chrono::system_clock>& later) {
355 using seconds_duration_type = std::chrono::duration<std::chrono::seconds::rep>::rep;
357 seconds_duration_type totalSeconds = std::chrono::duration_cast<std::chrono::seconds>(later - bevore).count();
360 seconds_duration_type days = totalSeconds / 86400;
361 seconds_duration_type remainder = totalSeconds % 86400;
362 seconds_duration_type hours = remainder / 3600;
363 remainder = remainder % 3600;
364 seconds_duration_type minutes = remainder / 60;
365 seconds_duration_type seconds = remainder % 60;
368 std::ostringstream oss;
370 oss << days <<
" day" << (days == 1 ?
"" :
"s") <<
", ";
372 oss << std::setw(2) << std::setfill(
'0') << hours <<
":" << std::setw(2) << std::setfill(
'0') << minutes <<
":" << std::setw(2)
373 << std::setfill(
'0') << seconds;
bool operator==(const EventReceiver &other)
std::weak_ptr< express::Response > response
EventReceiver(const std::shared_ptr< express::Response > &response)
core::timer::Timer heartbeatTimer
void sendEvent(const std::string &data, const std::string &event="", const std::string &id="") const
void connectClient(Mqtt *mqtt)
std::chrono::time_point< std::chrono::system_clock > onlineSinceTimePoint
void publishMessage(const std::string &topic, const std::string &message, uint8_t qoS, bool retain)
std::map< std::string, Mqtt * > modelMap
void addEventReceiver(const std::shared_ptr< express::Response > &response, const std::string &lastEventId, const std::shared_ptr< iot::mqtt::server::broker::Broker > &broker)
static MqttModel & instance()
void unsubscribeClient(const std::string &clientId, const std::string &topic)
std::list< EventReceiver > eventReceiverList
const std::map< std::string, Mqtt * > & getClients() const
Mqtt * getMqtt(const std::string &clientId) const
void subscribeClient(const std::string &clientId, const std::string &topic, const uint8_t qos)
void disconnectClient(const std::string &clientId)
static void sendEvent(const std::shared_ptr< express::Response > &response, const std::string &data, const std::string &event, const std::string &id)
std::string onlineDuration() const
std::string onlineSince() const
static void to_json(nlohmann::json &j, const Mqtt *mqtt)
static void to_json(nlohmann::json &j, const unsubscribe &unsubscribe)
static void to_json(nlohmann::json &j, const retaine &retaine)
static void to_json(nlohmann::json &j, const subscribe &subscribe)
static void to_json(nlohmann::json &j, const release &release)
const std::string & topic
const std::string & topic
const std::string & message
const std::string & clientId
const std::string & topic
const std::string & clientId
const std::string & topic