SNode.C
Loading...
Searching...
No Matches
RetainTree.cpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "iot/mqtt/server/broker/RetainTree.h"
43
44#include "iot/mqtt/Mqtt.h"
45#include "iot/mqtt/server/broker/Broker.h"
46
47#ifndef DOXYGEN_SHOULD_SKIP_THIS
48
49#include "log/Logger.h"
50
51#include <algorithm>
52#include <nlohmann/json.hpp>
53#include <utility>
54
55// IWYU pragma: no_include <nlohmann/detail/iterators/iteration_proxy.hpp>
56
57#endif // DOXYGEN_SHOULD_SKIP_THIS
58
59namespace iot::mqtt::server::broker {
60
62 : head(broker) {
63 }
64
65 void RetainTree::retain(Message&& message) {
66 if (!message.getTopic().empty()) {
67 if (!message.getMessage().empty()) {
68 head.retain(message, message.getTopic());
69 } else {
71 }
72 }
73 }
74
75 void RetainTree::appear(const std::string& clientId, const std::string& topic, uint8_t qoS) {
76 head.appear(clientId, topic, qoS);
77 }
78
79 void RetainTree::release(const std::string& topic) {
80 head.release(topic);
81 }
82
83 void RetainTree::fromJson(const nlohmann::json& json) {
84 if (!json.empty()) {
86 }
87 }
88
89 std::list<std::pair<std::string, std::string>> RetainTree::getRetainedTree() const {
91 }
92
93 nlohmann::json RetainTree::toJson() const {
94 return head.toJson();
95 }
96
97 void RetainTree::clear() {
99 }
100
101 RetainTree::TopicLevel::TopicLevel(iot::mqtt::server::broker::Broker* broker)
102 : broker(broker) {
103 }
104
105 RetainTree::TopicLevel& RetainTree::TopicLevel::fromJson(const nlohmann::json& json) {
106 subTopicLevels.clear();
107
108 message.fromJson(json.value("message", nlohmann::json()));
109
110 if (json.contains("topic_level")) {
111 for (const auto& topicLevelItem : json["topic_level"].items()) {
112 subTopicLevels.emplace(topicLevelItem.key(), TopicLevel(broker).fromJson(topicLevelItem.value()));
113 }
114 }
115
116 return *this;
117 }
118 void RetainTree::TopicLevel::retain(const Message& message, std::string topic) {
119 if (topic.empty()) {
120 LOG(DEBUG) << "MQTT Broker: Retain:";
121 LOG(DEBUG) << "MQTT Broker: Topic: " << message.getTopic();
122 LOG(DEBUG) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
123 LOG(DEBUG) << "MQTT Broker: QoS: " << static_cast<uint16_t>(message.getQoS());
124
125 this->message = message;
126 } else {
127 const std::string topicLevel = topic.substr(0, topic.find('/'));
128
129 topic.erase(0, topicLevel.size() + 1);
130
131 subTopicLevels.insert({topicLevel, RetainTree::TopicLevel(broker)}).first->second.retain(message, topic);
132 }
133 }
134
135 bool RetainTree::TopicLevel::release(std::string topic) {
136 if (topic.empty()) {
137 LOG(DEBUG) << "MQTT Broker: Release retained:";
138 LOG(DEBUG) << "MQTT Broker: Topic: " << message.getTopic();
139
141 } else {
142 const std::string topicLevel = topic.substr(0, topic.find('/'));
143
144 auto&& it = subTopicLevels.find(topicLevel);
145 if (it != subTopicLevels.end()) {
146 topic.erase(0, topicLevel.size() + 1);
147
148 if (it->second.release(topic)) {
149 LOG(DEBUG) << " Erase: " << topicLevel;
150
151 subTopicLevels.erase(it);
152 }
153 }
154 }
155
156 return subTopicLevels.empty() && message.getMessage().empty();
157 }
158
159 void RetainTree::TopicLevel::appear(const std::string& clientId, std::string topic, uint8_t qoS) {
160 if (topic.empty()) {
161 if (!message.getTopic().empty()) {
162 LOG(INFO) << "MQTT Broker: Retained Topic found:";
163 LOG(INFO) << "MQTT Broker: Topic: " << message.getTopic();
164 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
165 LOG(DEBUG) << "MQTT Broker: QoS: " << static_cast<uint16_t>(message.getQoS());
166 LOG(DEBUG) << "MQTT Broker: Client:";
167 LOG(DEBUG) << "MQTT Broker: QoS: " << static_cast<uint16_t>(qoS);
168
169 LOG(INFO) << "MQTT Broker: Distributing message ...";
170 broker->sendPublish(clientId, message, std::min(message.getQoS(), qoS), true);
171 LOG(INFO) << "MQTT Broker: ... distributing message completed";
172 }
173 } else {
174 const std::string topicLevel = topic.substr(0, topic.find('/'));
175
176 topic.erase(0, topicLevel.size() + 1);
177
178 auto foundNode = subTopicLevels.find(topicLevel);
179 if (foundNode != subTopicLevels.end()) {
180 foundNode->second.appear(clientId, topic, qoS);
181 } else if (topicLevel == "+") {
182 for (auto& [notUsed, topicTree] : subTopicLevels) {
183 topicTree.appear(clientId, topic, qoS);
184 }
185 } else if (topicLevel == "#") {
186 appear(clientId, qoS);
187 }
188 }
189 }
190
191 std::list<std::pair<std::string, std::string>> RetainTree::TopicLevel::getRetainTree() const {
192 return getRetainTree("");
193 }
194
195 void RetainTree::TopicLevel::appear(const std::string& clientId, uint8_t clientQoS) {
196 if (!message.getTopic().empty()) {
197 LOG(INFO) << "MQTT Broker: Retained Topic found:";
198 LOG(INFO) << "MQTT Broker: Topic: " << message.getTopic();
199 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
200 LOG(DEBUG) << "MQTT Broker: QoS: " << static_cast<uint16_t>(message.getQoS());
201 LOG(DEBUG) << "MQTT Broker: Client:";
202 LOG(DEBUG) << "MQTT Broker: QoS: " << static_cast<uint16_t>(clientQoS);
203
204 LOG(INFO) << "MQTT Broker: Distributing message ...";
205 broker->sendPublish(clientId, message, std::min(message.getQoS(), clientQoS), true);
206 LOG(INFO) << "MQTT Broker: ... distributing message completed";
207 }
208
209 for (auto& [topicLevel, topicTree] : subTopicLevels) {
210 topicTree.appear(clientId, clientQoS);
211 }
212 }
213
214 std::list<std::pair<std::string, std::string>> RetainTree::TopicLevel::getRetainTree(const std::string& absoluteTopicLevel) const {
215 std::list<std::pair<std::string, std::string>> topicLevelTree;
216 for (const auto& [topicLevelName, nextTopicLevel] : subTopicLevels) {
217 const std::string composedAbsoluteTopicLevelName = absoluteTopicLevel + topicLevelName;
218
219 if (!nextTopicLevel.message.getMessage().empty()) {
220 topicLevelTree.emplace_back(composedAbsoluteTopicLevelName, nextTopicLevel.message.getMessage());
221 }
222
223 topicLevelTree.splice(topicLevelTree.end(), nextTopicLevel.getRetainTree(composedAbsoluteTopicLevelName + "/"));
224 }
225
226 return topicLevelTree;
227 }
228
229 nlohmann::json RetainTree::TopicLevel::toJson() const {
230 nlohmann::json json;
231
232 if (!message.getMessage().empty()) {
233 json["message"] = message.toJson();
234 }
235
236 for (const auto& [topicLevel, topicLevelValue] : subTopicLevels) {
237 json["topic_level"][topicLevel] = topicLevelValue.toJson();
238 }
239
240 return json;
241 }
242
244 *this = TopicLevel(broker);
245 }
246
247} // namespace iot::mqtt::server::broker
static std::string toHexString(const std::string &data)
Definition Mqtt.cpp:394
void sendPublish(const std::string &clientId, Message &message, uint8_t qoS, bool retain)
Definition Broker.cpp:237
nlohmann::json toJson() const
Definition Message.cpp:94
const std::string & getTopic() const
Definition Message.cpp:66
const std::string & getMessage() const
Definition Message.cpp:74
Message & operator=(const Message &)=default
Message & fromJson(const nlohmann::json &json)
Definition Message.cpp:105
void retain(const Message &message, std::string topic)
std::list< std::pair< std::string, std::string > > getRetainTree(const std::string &absoluteTopicLevel) const
void appear(const std::string &clientId, uint8_t clientQoS)
TopicLevel(iot::mqtt::server::broker::Broker *broker)
TopicLevel & fromJson(const nlohmann::json &json)
void appear(const std::string &clientId, std::string topic, uint8_t qoS)
std::list< std::pair< std::string, std::string > > getRetainTree() const
std::map< std::string, TopicLevel > subTopicLevels
Definition RetainTree.h:105
void appear(const std::string &clientId, const std::string &topic, uint8_t qoS)
void fromJson(const nlohmann::json &json)
std::list< std::pair< std::string, std::string > > getRetainedTree() const
RetainTree(iot::mqtt::server::broker::Broker *broker)
void release(const std::string &topic)