SNode.C
Loading...
Searching...
No Matches
SubscribtionTree.cpp
Go to the documentation of this file.
1/*
2 * SNode.C - a slim toolkit for network communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "iot/mqtt/server/broker/SubscribtionTree.h"
21
22#include "iot/mqtt/Mqtt.h"
23#include "iot/mqtt/server/broker/Broker.h"
24
25#ifndef DOXYGEN_SHOULD_SKIP_THIS
26
27#include "log/Logger.h"
28
29#include <algorithm>
30#include <nlohmann/json.hpp>
31#include <string>
32#include <utility>
33
34// IWYU pragma: no_include <nlohmann/detail/iterators/iteration_proxy.hpp>
35
36#endif // DOXYGEN_SHOULD_SKIP_THIS
37
38namespace iot::mqtt::server::broker {
39
40 SubscribtionTree::SubscribtionTree(iot::mqtt::server::broker::Broker* broker)
41 : head(broker, "") {
42 }
43
44 void SubscribtionTree::appear(const std::string& clientId) {
45 head.appear(clientId, "");
46 }
47
48 bool SubscribtionTree::subscribe(const std::string& topic, const std::string& clientId, uint8_t qoS) {
49 bool success = false;
50
51 auto hashCount = std::ranges::count(topic, '#');
52 if (!topic.empty() && (hashCount == 0 || (hashCount == 1 && topic.ends_with('#')))) {
53 head.subscribe(clientId, qoS, topic);
54
55 success = true;
56 } else {
57 LOG(ERROR) << "MQTT Broker: Subscribe: Wrong '#' placement: " << topic;
58 }
59
60 return success;
61 }
62
63 void SubscribtionTree::publish(Message&& message) {
64 auto hashCount = std::ranges::count(message.getTopic(), '#');
65 if (!message.getTopic().empty() && (hashCount == 0 || (hashCount == 1 && message.getTopic().ends_with('#')))) {
66 head.publish(message, message.getTopic());
67 } else {
68 LOG(ERROR) << "MQTT Broker: Publish: Wrong '#' placement: " << message.getTopic();
69 }
70 }
71
72 void SubscribtionTree::unsubscribe(const std::string& topic, const std::string& clientId) {
73 auto hashCount = std::ranges::count(topic, '#');
74 if (!topic.empty() && (hashCount == 0 || (hashCount == 1 && topic.ends_with('#')))) {
75 head.unsubscribe(clientId, topic);
76 } else {
77 LOG(ERROR) << "MQTT Broker: Unsubscribe: Wrong '#' placement: " << topic;
78 }
79 }
80
81 void SubscribtionTree::unsubscribe(const std::string& clientId) {
82 head.unsubscribe(clientId);
83 }
84
85 SubscribtionTree::TopicLevel::TopicLevel(Broker* broker, const std::string& topicLevel)
86 : broker(broker)
87 , topicLevel(topicLevel) {
88 }
89
90 void SubscribtionTree::TopicLevel::appear(const std::string& clientId, const std::string& topic) {
91 if (clientIds.contains(clientId)) {
92 broker->appear(clientId, topic, clientIds[clientId]);
93 }
94
95 for (auto& [topicLevel, subscribtion] : topicLevels) {
96 subscribtion.appear(clientId, std::string(topic).append(topic.empty() ? "" : "/").append(topicLevel));
97 }
98 }
99
100 bool SubscribtionTree::TopicLevel::subscribe(const std::string& clientId, uint8_t qoS, std::string topic) {
101 if (topic.empty()) {
102 LOG(INFO) << "MQTT Broker: Subscribe";
103 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
104
105 clientIds[clientId] = qoS;
106 } else {
107 const std::string topicLevel = topic.substr(0, topic.find('/'));
108
109 topic.erase(0, topicLevel.size() + 1);
110
111 const auto& [it, inserted] = topicLevels.insert({topicLevel, SubscribtionTree::TopicLevel(broker, topicLevel)});
112
113 if (!it->second.subscribe(clientId, qoS, topic)) {
114 LOG(DEBUG) << "MQTT Broker: Erase topic: " << topicLevel << " /" << topic;
115
116 topicLevels.erase(it);
117 } else {
118 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel << " /" << topic;
119 }
120 }
121
122 return !topicLevels.empty() || !clientIds.empty();
123 }
124
125 void SubscribtionTree::TopicLevel::publish(Message& message, std::string topic) {
126 if (topic.empty()) {
127 LOG(INFO) << "MQTT Broker: Found match:";
128 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "';";
129 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
130
131 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match ...";
132 for (auto& [clientId, clientQoS] : clientIds) {
133 broker->sendPublish(clientId, message, clientQoS, false);
134 }
135 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
136
137 const auto nextHashLevel = topicLevels.find("#");
138 if (nextHashLevel != topicLevels.end()) {
139 LOG(INFO) << "MQTT Broker: Found parent match:";
140 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "'";
141 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
142
143 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match ...";
144 for (auto& [clientId, clientQoS] : nextHashLevel->second.clientIds) {
145 broker->sendPublish(clientId, message, clientQoS, false);
146 }
147 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
148 }
149 } else {
150 const std::string topicLevel = topic.substr(0, topic.find('/'));
151
152 topic.erase(0, topicLevel.size() + 1);
153
154 auto foundNode = topicLevels.find(topicLevel);
155 if (foundNode != topicLevels.end()) {
156 foundNode->second.publish(message, topic);
157 }
158
159 foundNode = topicLevels.find("+");
160 if (foundNode != topicLevels.end()) {
161 foundNode->second.publish(message, topic);
162 }
163
164 foundNode = topicLevels.find("#");
165 if (foundNode != topicLevels.end()) {
166 LOG(INFO) << "MQTT Broker: Found match:";
167 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "'";
168 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
169
170 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match '" << message.getTopic() << "' ...";
171 for (auto& [clientId, clientQoS] : foundNode->second.clientIds) {
172 broker->sendPublish(clientId, message, clientQoS, false);
173 }
174 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
175 }
176 }
177 }
178
179 bool SubscribtionTree::TopicLevel::unsubscribe(const std::string& clientId, std::string topic) {
180 if (topic.empty()) {
181 if (clientIds.erase(clientId) != 0) {
182 LOG(INFO) << "MQTT Broker: Unsubscribe";
183 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
184 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel;
185 }
186 } else {
187 const std::string topicLevel = topic.substr(0, topic.find('/'));
188
189 auto&& it = topicLevels.find(topicLevel);
190 if (it != topicLevels.end()) {
191 topic.erase(0, topicLevel.size() + 1);
192
193 if (it->second.unsubscribe(clientId, topic)) {
194 LOG(DEBUG) << "MQTT Broker: Erase Topic: " << it->first;
195
196 topicLevels.erase(it);
197 }
198 }
199 }
200
201 return clientIds.empty() && topicLevels.empty();
202 }
203
204 bool SubscribtionTree::TopicLevel::unsubscribe(const std::string& clientId) {
205 if (clientIds.erase(clientId) != 0) {
206 LOG(INFO) << "MQTT Broker: Unsubscribe";
207 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
208 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel;
209 }
210
211 for (auto it = topicLevels.begin(); it != topicLevels.end();) {
212 if (it->second.unsubscribe(clientId)) {
213 LOG(DEBUG) << "MQTT Broker: Erase Topic: " << it->first;
214
215 it = topicLevels.erase(it);
216 } else {
217 ++it;
218 }
219 }
220
221 return clientIds.empty() && topicLevels.empty();
222 }
223
225 nlohmann::json json;
226
227 for (const auto& [topicLevelName, topicLevel] : topicLevels) {
228 json["topic_filter"][topicLevelName] = topicLevel.toJson();
229 }
230
231 for (const auto& [subscriber, qoS] : clientIds) {
232 json["qos_map"][subscriber] = qoS;
233 }
234
235 return json;
236 }
237
239 clientIds.clear();
240 topicLevels.clear();
241
242 if (json.contains("qos_map")) {
243 for (const auto& subscriber : json["qos_map"].items()) {
244 clientIds.emplace(subscriber.key(), subscriber.value());
245 }
246 }
247
248 if (json.contains("topic_filter")) {
249 for (const auto& topicLevelItem : json["topic_filter"].items()) {
250 topicLevels.emplace(topicLevelItem.key(), TopicLevel(broker, topicLevelItem.key()).fromJson(topicLevelItem.value()));
251 }
252 }
253
254 return *this;
255 }
256
258 *this = TopicLevel(broker, "");
259 }
260
261 void SubscribtionTree::fromJson(const nlohmann::json& json) {
262 if (!json.empty()) {
263 head.fromJson(json);
264 }
265 }
266
268 head.clear();
269 }
270
272 return head.toJson();
273 }
274
275} // namespace iot::mqtt::server::broker
void appear(const std::string &clientId, const std::string &topic)
TopicLevel & fromJson(const nlohmann::json &json)
bool unsubscribe(const std::string &clientId, std::string topic)
bool subscribe(const std::string &clientId, uint8_t qoS, std::string topic)
void fromJson(const nlohmann::json &json)
void unsubscribe(const std::string &clientId)
void appear(const std::string &clientId)
void unsubscribe(const std::string &topic, const std::string &clientId)
bool subscribe(const std::string &topic, const std::string &clientId, uint8_t qoS)