SNode.C
Loading...
Searching...
No Matches
RetainTree.cpp
Go to the documentation of this file.
1/*
2 * SNode.C - a slim toolkit for network communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "iot/mqtt/server/broker/RetainTree.h"
21
22#include "iot/mqtt/Mqtt.h"
23#include "iot/mqtt/server/broker/Broker.h"
24
25#ifndef DOXYGEN_SHOULD_SKIP_THIS
26
27#include "log/Logger.h"
28
29#include <algorithm>
30#include <nlohmann/json.hpp>
31#include <utility>
32
33// IWYU pragma: no_include <nlohmann/detail/iterators/iteration_proxy.hpp>
34
35#endif // DOXYGEN_SHOULD_SKIP_THIS
36
37namespace iot::mqtt::server::broker {
38
40 : head(broker) {
41 }
42
43 void RetainTree::retain(Message&& message) {
44 if (!message.getTopic().empty()) {
45 if (!message.getMessage().empty()) {
46 head.retain(message, message.getTopic());
47 } else {
48 head.release(message.getTopic());
49 }
50 }
51 }
52
53 void RetainTree::appear(const std::string& clientId, const std::string& topic, uint8_t qoS) {
54 head.appear(clientId, topic, qoS);
55 }
56
57 void RetainTree::fromJson(const nlohmann::json& json) {
58 if (!json.empty()) {
59 head.fromJson(json);
60 }
61 }
62
64 return head.toJson();
65 }
66
67 void RetainTree::clear() {
68 head.clear();
69 }
70
71 RetainTree::TopicLevel::TopicLevel(iot::mqtt::server::broker::Broker* broker)
72 : broker(broker) {
73 }
74
75 RetainTree::TopicLevel& RetainTree::TopicLevel::fromJson(const nlohmann::json& json) {
76 subTopicLevels.clear();
77
78 message.fromJson(json.value("message", nlohmann::json()));
79
80 if (json.contains("topic_level")) {
81 for (const auto& topicLevelItem : json["topic_level"].items()) {
82 subTopicLevels.emplace(topicLevelItem.key(), TopicLevel(broker).fromJson(topicLevelItem.value()));
83 }
84 }
85
86 return *this;
87 }
88 void RetainTree::TopicLevel::retain(const Message& message, std::string topic) {
89 if (topic.empty()) {
90 LOG(DEBUG) << "MQTT Broker: Retain:";
91 LOG(DEBUG) << "MQTT Broker: Topic: " << message.getTopic();
92 LOG(DEBUG) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
93 LOG(DEBUG) << "MQTT Broker: QoS: " << static_cast<uint16_t>(message.getQoS());
94
95 this->message = message;
96 } else {
97 const std::string topicLevel = topic.substr(0, topic.find('/'));
98
99 topic.erase(0, topicLevel.size() + 1);
100
101 subTopicLevels.insert({topicLevel, RetainTree::TopicLevel(broker)}).first->second.retain(message, topic);
102 }
103 }
104
105 bool RetainTree::TopicLevel::release(std::string topic) {
106 if (topic.empty()) {
107 LOG(DEBUG) << "MQTT Broker: Release retained:";
108 LOG(DEBUG) << "MQTT Broker: Topic: " << message.getTopic();
109
110 message = Message();
111 } else {
112 const std::string topicLevel = topic.substr(0, topic.find('/'));
113
114 auto&& it = subTopicLevels.find(topicLevel);
115 if (it != subTopicLevels.end()) {
116 topic.erase(0, topicLevel.size() + 1);
117
118 if (it->second.release(topic)) {
119 LOG(DEBUG) << " Erase: " << topicLevel;
120
121 subTopicLevels.erase(it);
122 }
123 }
124 }
125
126 return subTopicLevels.empty() && message.getMessage().empty();
127 }
128
129 void RetainTree::TopicLevel::appear(const std::string& clientId, std::string topic, uint8_t qoS) {
130 if (topic.empty()) {
131 if (!message.getTopic().empty()) {
132 LOG(INFO) << "MQTT Broker: Retained Topic found:";
133 LOG(INFO) << "MQTT Broker: Topic: " << message.getTopic();
134 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
135 LOG(DEBUG) << "MQTT Broker: QoS: " << static_cast<uint16_t>(message.getQoS());
136 LOG(DEBUG) << "MQTT Broker: Client:";
137 LOG(DEBUG) << "MQTT Broker: QoS: " << static_cast<uint16_t>(qoS);
138
139 LOG(INFO) << "MQTT Broker: Distributing message ...";
140 broker->sendPublish(clientId, message, std::min(message.getQoS(), qoS), true);
141 LOG(INFO) << "MQTT Broker: ... distributing message completed";
142 }
143 } else {
144 const std::string topicLevel = topic.substr(0, topic.find('/'));
145
146 topic.erase(0, topicLevel.size() + 1);
147
148 auto foundNode = subTopicLevels.find(topicLevel);
149 if (foundNode != subTopicLevels.end()) {
150 foundNode->second.appear(clientId, topic, qoS);
151 } else if (topicLevel == "+") {
152 for (auto& [notUsed, topicTree] : subTopicLevels) {
153 topicTree.appear(clientId, topic, qoS);
154 }
155 } else if (topicLevel == "#") {
156 appear(clientId, qoS);
157 }
158 }
159 }
160
161 void RetainTree::TopicLevel::appear(const std::string& clientId, uint8_t clientQoS) {
162 if (!message.getTopic().empty()) {
163 LOG(INFO) << "MQTT Broker: Retained Topic found:";
164 LOG(INFO) << "MQTT Broker: Topic: " << message.getTopic();
165 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
166 LOG(DEBUG) << "MQTT Broker: QoS: " << static_cast<uint16_t>(message.getQoS());
167 LOG(DEBUG) << "MQTT Broker: Client:";
168 LOG(DEBUG) << "MQTT Broker: QoS: " << static_cast<uint16_t>(clientQoS);
169
170 LOG(INFO) << "MQTT Broker: Distributing message ...";
171 broker->sendPublish(clientId, message, std::min(message.getQoS(), clientQoS), true);
172 LOG(INFO) << "MQTT Broker: ... distributing message completed";
173 }
174
175 for (auto& [topicLevel, topicTree] : subTopicLevels) {
176 topicTree.appear(clientId, clientQoS);
177 }
178 }
179
181 nlohmann::json json;
182
183 if (!message.getMessage().empty()) {
184 json["message"] = message.toJson();
185 }
186
187 for (const auto& [topicLevel, topicLevelValue] : subTopicLevels) {
188 json["topic_level"][topicLevel] = topicLevelValue.toJson();
189 }
190
191 return json;
192 }
193
195 *this = TopicLevel(broker);
196 }
197
198} // namespace iot::mqtt::server::broker
void retain(const Message &message, std::string topic)
void appear(const std::string &clientId, uint8_t clientQoS)
TopicLevel(iot::mqtt::server::broker::Broker *broker)
TopicLevel & fromJson(const nlohmann::json &json)
void appear(const std::string &clientId, std::string topic, uint8_t qoS)
void appear(const std::string &clientId, const std::string &topic, uint8_t qoS)
void fromJson(const nlohmann::json &json)
RetainTree(iot::mqtt::server::broker::Broker *broker)