2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
42#include "iot/mqtt/server/broker/RetainTree.h"
44#include "iot/mqtt/Mqtt.h"
45#include "iot/mqtt/server/broker/Broker.h"
47#ifndef DOXYGEN_SHOULD_SKIP_THIS
49#include "log/Logger.h"
52#include <nlohmann/json.hpp>
59namespace iot::mqtt::server::
broker {
75 void RetainTree::
appear(
const std::string& clientId,
const std::string& topic, uint8_t qoS) {
110 if (json.contains(
"topic_level")) {
111 for (
const auto& topicLevelItem : json[
"topic_level"].items()) {
120 LOG(DEBUG) <<
"MQTT Broker: Retain:";
121 LOG(DEBUG) <<
"MQTT Broker: Topic: " << message
.getTopic();
123 LOG(DEBUG) <<
"MQTT Broker: QoS: " <<
static_cast<uint16_t>(message
.getQoS());
127 const std::string topicLevel = topic.substr(0, topic.find(
'/'));
129 topic.erase(0, topicLevel.size() + 1);
137 LOG(DEBUG) <<
"MQTT Broker: Release retained:";
142 const std::string topicLevel = topic.substr(0, topic.find(
'/'));
146 topic.erase(0, topicLevel.size() + 1);
149 LOG(DEBUG) <<
" Erase: " << topicLevel;
162 LOG(INFO) <<
"MQTT Broker: Retained Topic found:";
165 LOG(DEBUG) <<
"MQTT Broker: QoS: " <<
static_cast<uint16_t>(
message.getQoS());
166 LOG(DEBUG) <<
"MQTT Broker: Client:";
167 LOG(DEBUG) <<
"MQTT Broker: QoS: " <<
static_cast<uint16_t>(qoS);
169 LOG(INFO) <<
"MQTT Broker: Distributing message ...";
171 LOG(INFO) <<
"MQTT Broker: ... distributing message completed";
174 const std::string topicLevel = topic.substr(0, topic.find(
'/'));
176 topic.erase(0, topicLevel.size() + 1);
181 }
else if (topicLevel ==
"+") {
185 }
else if (topicLevel ==
"#") {
197 LOG(INFO) <<
"MQTT Broker: Retained Topic found:";
200 LOG(DEBUG) <<
"MQTT Broker: QoS: " <<
static_cast<uint16_t>(
message.getQoS());
201 LOG(DEBUG) <<
"MQTT Broker: Client:";
202 LOG(DEBUG) <<
"MQTT Broker: QoS: " <<
static_cast<uint16_t>(clientQoS);
204 LOG(INFO) <<
"MQTT Broker: Distributing message ...";
206 LOG(INFO) <<
"MQTT Broker: ... distributing message completed";
215 std::list<std::pair<std::string, std::string>> topicLevelTree;
216 for (
const auto& [topicLevelName, nextTopicLevel] :
subTopicLevels) {
217 const std::string composedAbsoluteTopicLevelName = absoluteTopicLevel + topicLevelName;
223 topicLevelTree.splice(topicLevelTree.end(), nextTopicLevel
.getRetainTree(composedAbsoluteTopicLevelName +
"/"));
226 return topicLevelTree;
236 for (
const auto& [topicLevel, topicLevelValue] :
subTopicLevels) {
237 json[
"topic_level"][topicLevel] = topicLevelValue
.toJson();
static std::string toHexString(const std::string &data)
void sendPublish(const std::string &clientId, Message &message, uint8_t qoS, bool retain)
nlohmann::json toJson() const
const std::string & getTopic() const
const std::string & getMessage() const
Message & operator=(const Message &)=default
Message & fromJson(const nlohmann::json &json)
void retain(const Message &message, std::string topic)
std::list< std::pair< std::string, std::string > > getRetainTree(const std::string &absoluteTopicLevel) const
void appear(const std::string &clientId, uint8_t clientQoS)
TopicLevel(iot::mqtt::server::broker::Broker *broker)
TopicLevel & fromJson(const nlohmann::json &json)
void appear(const std::string &clientId, std::string topic, uint8_t qoS)
nlohmann::json toJson() const
std::list< std::pair< std::string, std::string > > getRetainTree() const
std::map< std::string, TopicLevel > subTopicLevels
bool release(std::string topic)
void appear(const std::string &clientId, const std::string &topic, uint8_t qoS)
void retain(Message &&message)
void fromJson(const nlohmann::json &json)
std::list< std::pair< std::string, std::string > > getRetainedTree() const
RetainTree(iot::mqtt::server::broker::Broker *broker)
nlohmann::json toJson() const
void release(const std::string &topic)