SNode.C
Loading...
Searching...
No Matches
SubscribtionTree.cpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "iot/mqtt/server/broker/SubscribtionTree.h"
43
44#include "iot/mqtt/Mqtt.h"
45#include "iot/mqtt/server/broker/Broker.h"
46
47#ifndef DOXYGEN_SHOULD_SKIP_THIS
48
49#include "log/Logger.h"
50
51#include <algorithm>
52#include <nlohmann/json.hpp>
53#include <string>
54
55// IWYU pragma: no_include <nlohmann/detail/iterators/iteration_proxy.hpp>
56
57#endif // DOXYGEN_SHOULD_SKIP_THIS
58
59namespace iot::mqtt::server::broker {
60
61 SubscribtionTree::SubscribtionTree(iot::mqtt::server::broker::Broker* broker)
62 : head(broker, "") {
63 }
64
65 void SubscribtionTree::appear(const std::string& clientId) {
66 head.appear(clientId, "");
67 }
68
69 bool SubscribtionTree::subscribe(const std::string& topic, const std::string& clientId, uint8_t qoS) {
70 bool success = false;
71
72 auto hashCount = std::ranges::count(topic, '#');
73 if (!topic.empty() && (hashCount == 0 || (hashCount == 1 && topic.ends_with('#')))) {
74 head.subscribe(clientId, qoS, topic);
75
76 success = true;
77 } else {
78 LOG(ERROR) << "MQTT Broker: Subscribe: Wrong '#' placement: " << topic;
79 }
80
81 return success;
82 }
83
84 void SubscribtionTree::publish(Message&& message) {
85 auto hashCount = std::ranges::count(message.getTopic(), '#');
86 if (!message.getTopic().empty() && (hashCount == 0 || (hashCount == 1 && message.getTopic().ends_with('#')))) {
87 head.publish(message, message.getTopic());
88 } else {
89 LOG(ERROR) << "MQTT Broker: Publish: Wrong '#' placement: " << message.getTopic();
90 }
91 }
92
93 void SubscribtionTree::unsubscribe(const std::string& topic, const std::string& clientId) {
94 auto hashCount = std::ranges::count(topic, '#');
95 if (!topic.empty() && (hashCount == 0 || (hashCount == 1 && topic.ends_with('#')))) {
96 head.unsubscribe(clientId, topic);
97 } else {
98 LOG(ERROR) << "MQTT Broker: Unsubscribe: Wrong '#' placement: " << topic;
99 }
100 }
101
102 void SubscribtionTree::unsubscribe(const std::string& clientId) {
103 head.unsubscribe(clientId);
104 }
105
106 std::list<std::string> SubscribtionTree::getSubscriptions(const std::string& clientId) const {
107 std::list<std::string> subscriptions = head.getSubscriptions(clientId);
108
109 return subscriptions;
110 }
111
112 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> SubscribtionTree::getSubscriptionTree() const {
114 }
115
116 void SubscribtionTree::fromJson(const nlohmann::json& json) {
117 if (!json.empty()) {
118 head.fromJson(json);
119 }
120 }
121
122 nlohmann::json SubscribtionTree::toJson() const {
123 return head.toJson();
124 }
125
128 }
129
130 SubscribtionTree::TopicLevel::TopicLevel(Broker* broker, const std::string& topicLevel)
131 : broker(broker)
132 , topicLevel(topicLevel) {
133 }
134
135 void SubscribtionTree::TopicLevel::appear(const std::string& clientId, const std::string& topic) {
136 if (clientIds.contains(clientId)) {
137 broker->appear(clientId, topic, clientIds[clientId]);
138 }
139
140 for (auto& [topicLevel, subscribtion] : topicLevels) {
141 subscribtion.appear(clientId, std::string(topic).append(topic.empty() ? "" : "/").append(topicLevel));
142 }
143 }
144
145 bool SubscribtionTree::TopicLevel::subscribe(const std::string& clientId, uint8_t qoS, std::string topic) {
146 if (topic.empty()) {
147 LOG(INFO) << "MQTT Broker: Subscribe";
148 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
149
150 clientIds[clientId] = qoS;
151 } else {
152 const std::string topicLevel = topic.substr(0, topic.find('/'));
153
154 topic.erase(0, topicLevel.size() + 1);
155
156 const auto& [it, inserted] = topicLevels.insert({topicLevel, SubscribtionTree::TopicLevel(broker, topicLevel)});
157
158 if (!it->second.subscribe(clientId, qoS, topic)) {
159 LOG(DEBUG) << "MQTT Broker: Erase topic: " << topicLevel << " /" << topic;
160
161 topicLevels.erase(it);
162 } else {
163 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel << " /" << topic;
164 }
165 }
166
167 return !topicLevels.empty() || !clientIds.empty();
168 }
169
170 void SubscribtionTree::TopicLevel::publish(Message& message, std::string topic) {
171 if (topic.empty()) {
172 LOG(INFO) << "MQTT Broker: Found match:";
173 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "';";
174 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
175
176 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match ...";
177 for (auto& [clientId, clientQoS] : clientIds) {
178 broker->sendPublish(clientId, message, clientQoS, false);
179 }
180 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
181
182 const auto nextHashLevel = topicLevels.find("#");
183 if (nextHashLevel != topicLevels.end()) {
184 LOG(INFO) << "MQTT Broker: Found parent match:";
185 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "'";
186 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
187
188 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match ...";
189 for (auto& [clientId, clientQoS] : nextHashLevel->second.clientIds) {
190 broker->sendPublish(clientId, message, clientQoS, false);
191 }
192 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
193 }
194 } else {
195 const std::string topicLevel = topic.substr(0, topic.find('/'));
196
197 topic.erase(0, topicLevel.size() + 1);
198
199 auto foundNode = topicLevels.find(topicLevel);
200 if (foundNode != topicLevels.end()) {
201 foundNode->second.publish(message, topic);
202 }
203
204 foundNode = topicLevels.find("+");
205 if (foundNode != topicLevels.end()) {
206 foundNode->second.publish(message, topic);
207 }
208
209 foundNode = topicLevels.find("#");
210 if (foundNode != topicLevels.end()) {
211 LOG(INFO) << "MQTT Broker: Found match:";
212 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "'";
213 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
214
215 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match '" << message.getTopic() << "' ...";
216 for (auto& [clientId, clientQoS] : foundNode->second.clientIds) {
217 broker->sendPublish(clientId, message, clientQoS, false);
218 }
219 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
220 }
221 }
222 }
223
224 bool SubscribtionTree::TopicLevel::unsubscribe(const std::string& clientId, std::string topic) {
225 if (topic.empty()) {
226 if (clientIds.erase(clientId) != 0) {
227 LOG(INFO) << "MQTT Broker: Unsubscribe";
228 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
229 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel;
230 }
231 } else {
232 const std::string topicLevel = topic.substr(0, topic.find('/'));
233
234 auto&& it = topicLevels.find(topicLevel);
235 if (it != topicLevels.end()) {
236 topic.erase(0, topicLevel.size() + 1);
237
238 if (it->second.unsubscribe(clientId, topic)) {
239 LOG(DEBUG) << "MQTT Broker: Erase Topic: " << it->first;
240
241 topicLevels.erase(it);
242 }
243 }
244 }
245
246 return clientIds.empty() && topicLevels.empty();
247 }
248
249 bool SubscribtionTree::TopicLevel::unsubscribe(const std::string& clientId) {
250 if (clientIds.erase(clientId) != 0) {
251 LOG(INFO) << "MQTT Broker: Unsubscribe";
252 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
253 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel;
254 }
255
256 for (auto it = topicLevels.begin(); it != topicLevels.end();) {
257 if (it->second.unsubscribe(clientId)) {
258 LOG(DEBUG) << "MQTT Broker: Erase Topic: " << it->first;
259
260 it = topicLevels.erase(it);
261 } else {
262 ++it;
263 }
264 }
265
266 return clientIds.empty() && topicLevels.empty();
267 }
268
269 nlohmann::json SubscribtionTree::TopicLevel::toJson() const {
270 nlohmann::json json;
271
272 for (const auto& [topicLevelName, topicLevel] : topicLevels) {
273 json["topic_filter"][topicLevelName] = topicLevel.toJson();
274 }
275
276 for (const auto& [subscriber, qoS] : clientIds) {
277 json["qos_map"][subscriber] = qoS;
278 }
279
280 return json;
281 }
282
283 std::list<std::string> SubscribtionTree::TopicLevel::getSubscriptions(const std::string& clientId) const {
284 return getSubscriptions("", clientId);
285 }
286
287 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> SubscribtionTree::TopicLevel::getSubscriptionTree() const {
288 return getSubscriptionTree("");
289 }
290
291 std::list<std::string> SubscribtionTree::TopicLevel::getSubscriptions(const std::string& absoluteTopicLevel,
292 const std::string& clientId) const {
293 std::list<std::string> topicLevelList;
294
295 for (const auto& [topicLevelName, nextTopicLevel] : topicLevels) {
296 const std::string currentAbsoluteTopicLevel = absoluteTopicLevel + topicLevelName;
297
298 if (nextTopicLevel.clientIds.contains(clientId)) {
299 topicLevelList.push_back(currentAbsoluteTopicLevel);
300 }
301
302 topicLevelList.splice(topicLevelList.end(), nextTopicLevel.getSubscriptions(currentAbsoluteTopicLevel + "/", clientId));
303 }
304
305 return topicLevelList;
306 }
307
308 std::map<std::string, std::list<std::pair<std::string, uint8_t>>>
309 SubscribtionTree::TopicLevel::getSubscriptionTree(const std::string& absoluteTopicLevel) const {
310 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> topicLevelTree;
311
312 for (const auto& [topicLevelName, nextTopicLevel] : topicLevels) {
313 const std::string composedAbsoluteTopicLevelName = absoluteTopicLevel + topicLevelName;
314
315 for (const auto& clientId : nextTopicLevel.clientIds) {
316 topicLevelTree[composedAbsoluteTopicLevelName].emplace_back(clientId);
317 }
318
319 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> subSubscriptionTree =
320 nextTopicLevel.getSubscriptionTree(composedAbsoluteTopicLevelName + "/");
321
322 topicLevelTree.insert(subSubscriptionTree.begin(), subSubscriptionTree.end());
323 }
324
325 return topicLevelTree;
326 }
327
329 clientIds.clear();
330 topicLevels.clear();
331
332 if (json.contains("qos_map")) {
333 for (const auto& subscriber : json["qos_map"].items()) {
334 clientIds.emplace(subscriber.key(), subscriber.value());
335 }
336 }
337
338 if (json.contains("topic_filter")) {
339 for (const auto& topicLevelItem : json["topic_filter"].items()) {
340 topicLevels.emplace(topicLevelItem.key(), TopicLevel(broker, topicLevelItem.key()).fromJson(topicLevelItem.value()));
341 }
342 }
343
344 return *this;
345 }
346
348 *this = TopicLevel(broker, "");
349 }
350
351} // namespace iot::mqtt::server::broker
static std::string toHexString(const std::string &data)
Definition Mqtt.cpp:394
void appear(const std::string &clientId, const std::string &topic, uint8_t qoS)
Definition Broker.cpp:146
void sendPublish(const std::string &clientId, Message &message, uint8_t qoS, bool retain)
Definition Broker.cpp:237
const std::string & getTopic() const
Definition Message.cpp:66
const std::string & getMessage() const
Definition Message.cpp:74
void appear(const std::string &clientId, const std::string &topic)
void publish(Message &message, std::string topic)
TopicLevel & fromJson(const nlohmann::json &json)
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree(const std::string &absoluteTopicLevel) const
TopicLevel(iot::mqtt::server::broker::Broker *broker, const std::string &topicLevel)
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree() const
bool unsubscribe(const std::string &clientId, std::string topic)
std::list< std::string > getSubscriptions(const std::string &clientId) const
bool subscribe(const std::string &clientId, uint8_t qoS, std::string topic)
std::list< std::string > getSubscriptions(const std::string &absoluteTopicLevel, const std::string &clientId) const
std::list< std::string > getSubscriptions(const std::string &clientId) const
void fromJson(const nlohmann::json &json)
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree() const
void unsubscribe(const std::string &clientId)
void appear(const std::string &clientId)
void unsubscribe(const std::string &topic, const std::string &clientId)
SubscribtionTree(iot::mqtt::server::broker::Broker *broker)
bool subscribe(const std::string &topic, const std::string &clientId, uint8_t qoS)