SNode.C
Loading...
Searching...
No Matches
SubscriptionTree.cpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "iot/mqtt/server/broker/SubscriptionTree.h"
43
44#include "iot/mqtt/Mqtt.h"
45#include "iot/mqtt/server/broker/Broker.h"
46
47#ifndef DOXYGEN_SHOULD_SKIP_THIS
48
49#include "log/Logger.h"
50
51#include <algorithm>
52#include <nlohmann/json.hpp>
53#include <string>
54
55// IWYU pragma: no_include <nlohmann/detail/iterators/iteration_proxy.hpp>
56
57#endif // DOXYGEN_SHOULD_SKIP_THIS
58
59namespace iot::mqtt::server::broker {
60
61 SubscriptionTree::SubscriptionTree(iot::mqtt::server::broker::Broker* broker)
62 : head(broker, "") {
63 }
64
65 void SubscriptionTree::appear(const std::string& clientId) {
66 head.appear(clientId, "");
67 }
68
69 bool SubscriptionTree::subscribe(const std::string& topic, const std::string& clientId, uint8_t qoS) {
70 bool success = false;
71
72 auto hashCount = std::ranges::count(topic, '#');
73 if (!topic.empty() && (hashCount == 0 || (hashCount == 1 && topic.ends_with('#')))) {
74 head.subscribe(clientId, qoS, topic);
75
76 success = true;
77 } else {
78 LOG(ERROR) << "MQTT Broker: Subscribe: Wrong '#' placement: " << topic;
79 }
80
81 return success;
82 }
83
84 void SubscriptionTree::publish(Message&& message) {
85 auto hashCount = std::ranges::count(message.getTopic(), '#');
86 if (!message.getTopic().empty() && (hashCount == 0 || (hashCount == 1 && message.getTopic().ends_with('#')))) {
87 head.publish(message, message.getTopic());
88 } else {
89 LOG(ERROR) << "MQTT Broker: Publish: Wrong '#' placement: " << message.getTopic();
90 }
91 }
92
93 void SubscriptionTree::unsubscribe(const std::string& topic, const std::string& clientId) {
94 auto hashCount = std::ranges::count(topic, '#');
95 if (!topic.empty() && (hashCount == 0 || (hashCount == 1 && topic.ends_with('#')))) {
96 head.unsubscribe(clientId, topic);
97 } else {
98 LOG(ERROR) << "MQTT Broker: Unsubscribe: Wrong '#' placement: " << topic;
99 }
100 }
101
102 void SubscriptionTree::unsubscribe(const std::string& clientId) {
103 head.unsubscribe(clientId);
104 }
105
106 std::list<std::string> SubscriptionTree::getSubscriptions(const std::string& clientId) const {
107 return head.getSubscriptions(clientId);
108 }
109
110 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> SubscriptionTree::getSubscriptionTree() const {
112 }
113
114 void SubscriptionTree::fromJson(const nlohmann::json& json) {
115 if (!json.empty()) {
116 head.fromJson(json);
117 }
118 }
119
120 nlohmann::json SubscriptionTree::toJson() const {
121 return head.toJson();
122 }
123
126 }
127
128 SubscriptionTree::TopicLevel::TopicLevel(Broker* broker, const std::string& topicLevel)
129 : broker(broker)
130 , topicLevel(topicLevel) {
131 }
132
133 void SubscriptionTree::TopicLevel::appear(const std::string& clientId, const std::string& topic) {
134 if (clientIds.contains(clientId)) {
135 broker->appear(clientId, topic, clientIds[clientId]);
136 }
137
138 for (auto& [topicLevel, subscription] : topicLevels) {
139 subscription.appear(clientId, std::string(topic).append(topic.empty() ? "" : "/").append(topicLevel));
140 }
141 }
142
143 bool SubscriptionTree::TopicLevel::subscribe(const std::string& clientId, uint8_t qoS, std::string topic) {
144 if (topic.empty()) {
145 LOG(INFO) << "MQTT Broker: Subscribe";
146 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
147
148 clientIds[clientId] = qoS;
149 } else {
150 const std::string topicLevel = topic.substr(0, topic.find('/'));
151
152 topic.erase(0, topicLevel.size() + 1);
153
154 const auto& [it, inserted] = topicLevels.insert({topicLevel, SubscriptionTree::TopicLevel(broker, topicLevel)});
155
156 if (!it->second.subscribe(clientId, qoS, topic)) {
157 LOG(DEBUG) << "MQTT Broker: Erase topic: " << topicLevel << " /" << topic;
158
159 topicLevels.erase(it);
160 } else {
161 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel << " /" << topic;
162 }
163 }
164
165 return !topicLevels.empty() || !clientIds.empty();
166 }
167
168 void SubscriptionTree::TopicLevel::publish(Message& message, std::string topic) {
169 if (topic.empty()) {
170 LOG(INFO) << "MQTT Broker: Found match:";
171 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "';";
172 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
173
174 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match ...";
175 for (auto& [clientId, clientQoS] : clientIds) {
176 broker->sendPublish(clientId, message, clientQoS, false);
177 }
178 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
179
180 const auto nextHashLevel = topicLevels.find("#");
181 if (nextHashLevel != topicLevels.end()) {
182 LOG(INFO) << "MQTT Broker: Found parent match:";
183 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "'";
184 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
185
186 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match ...";
187 for (auto& [clientId, clientQoS] : nextHashLevel->second.clientIds) {
188 broker->sendPublish(clientId, message, clientQoS, false);
189 }
190 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
191 }
192 } else {
193 const std::string topicLevel = topic.substr(0, topic.find('/'));
194
195 topic.erase(0, topicLevel.size() + 1);
196
197 auto foundNode = topicLevels.find(topicLevel);
198 if (foundNode != topicLevels.end()) {
199 foundNode->second.publish(message, topic);
200 }
201
202 foundNode = topicLevels.find("+");
203 if (foundNode != topicLevels.end()) {
204 foundNode->second.publish(message, topic);
205 }
206
207 foundNode = topicLevels.find("#");
208 if (foundNode != topicLevels.end()) {
209 LOG(INFO) << "MQTT Broker: Found match:";
210 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "'";
211 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
212
213 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match '" << message.getTopic() << "' ...";
214 for (auto& [clientId, clientQoS] : foundNode->second.clientIds) {
215 broker->sendPublish(clientId, message, clientQoS, false);
216 }
217 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
218 }
219 }
220 }
221
222 bool SubscriptionTree::TopicLevel::unsubscribe(const std::string& clientId, std::string topic) {
223 if (topic.empty()) {
224 if (clientIds.erase(clientId) != 0) {
225 LOG(INFO) << "MQTT Broker: Unsubscribe";
226 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
227 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel;
228 }
229 } else {
230 const std::string topicLevel = topic.substr(0, topic.find('/'));
231
232 auto&& it = topicLevels.find(topicLevel);
233 if (it != topicLevels.end()) {
234 topic.erase(0, topicLevel.size() + 1);
235
236 if (it->second.unsubscribe(clientId, topic)) {
237 LOG(DEBUG) << "MQTT Broker: Erase Topic: " << it->first;
238
239 topicLevels.erase(it);
240 }
241 }
242 }
243
244 return clientIds.empty() && topicLevels.empty();
245 }
246
247 bool SubscriptionTree::TopicLevel::unsubscribe(const std::string& clientId) {
248 if (clientIds.erase(clientId) != 0) {
249 LOG(INFO) << "MQTT Broker: Unsubscribe";
250 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
251 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel;
252 }
253
254 for (auto it = topicLevels.begin(); it != topicLevels.end();) {
255 if (it->second.unsubscribe(clientId)) {
256 LOG(DEBUG) << "MQTT Broker: Erase Topic: " << it->first;
257
258 it = topicLevels.erase(it);
259 } else {
260 ++it;
261 }
262 }
263
264 return clientIds.empty() && topicLevels.empty();
265 }
266
267 nlohmann::json SubscriptionTree::TopicLevel::toJson() const {
268 nlohmann::json json;
269
270 for (const auto& [topicLevelName, topicLevel] : topicLevels) {
271 json["topic_filter"][topicLevelName] = topicLevel.toJson();
272 }
273
274 for (const auto& [subscriber, qoS] : clientIds) {
275 json["qos_map"][subscriber] = qoS;
276 }
277
278 return json;
279 }
280
281 std::list<std::string> SubscriptionTree::TopicLevel::getSubscriptions(const std::string& clientId) const {
282 return getSubscriptions("", clientId);
283 }
284
285 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> SubscriptionTree::TopicLevel::getSubscriptionTree() const {
286 return getSubscriptionTree("");
287 }
288
289 std::list<std::string> SubscriptionTree::TopicLevel::getSubscriptions(const std::string& absoluteTopicLevel,
290 const std::string& clientId) const {
291 std::list<std::string> topicLevelList;
292
293 for (const auto& [topicLevelName, nextTopicLevel] : topicLevels) {
294 const std::string currentAbsoluteTopicLevel = absoluteTopicLevel + topicLevelName;
295
296 if (nextTopicLevel.clientIds.contains(clientId)) {
297 topicLevelList.push_back(currentAbsoluteTopicLevel);
298 }
299
300 topicLevelList.splice(topicLevelList.end(), nextTopicLevel.getSubscriptions(currentAbsoluteTopicLevel + "/", clientId));
301 }
302
303 return topicLevelList;
304 }
305
306 std::map<std::string, std::list<std::pair<std::string, uint8_t>>>
307 SubscriptionTree::TopicLevel::getSubscriptionTree(const std::string& absoluteTopicLevel) const {
308 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> topicLevelTree;
309
310 for (const auto& [topicLevelName, nextTopicLevel] : topicLevels) {
311 const std::string composedAbsoluteTopicLevelName = absoluteTopicLevel + topicLevelName;
312
313 for (const auto& clientId : nextTopicLevel.clientIds) {
314 topicLevelTree[composedAbsoluteTopicLevelName].emplace_back(clientId);
315 }
316
317 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> subSubscriptionTree =
318 nextTopicLevel.getSubscriptionTree(composedAbsoluteTopicLevelName + "/");
319
320 topicLevelTree.insert(subSubscriptionTree.begin(), subSubscriptionTree.end());
321 }
322
323 return topicLevelTree;
324 }
325
327 clientIds.clear();
328 topicLevels.clear();
329
330 if (json.contains("qos_map")) {
331 for (const auto& subscriber : json["qos_map"].items()) {
332 clientIds.emplace(subscriber.key(), subscriber.value());
333 }
334 }
335
336 if (json.contains("topic_filter")) {
337 for (const auto& topicLevelItem : json["topic_filter"].items()) {
338 topicLevels.emplace(topicLevelItem.key(), TopicLevel(broker, topicLevelItem.key()).fromJson(topicLevelItem.value()));
339 }
340 }
341
342 return *this;
343 }
344
346 *this = TopicLevel(broker, "");
347 }
348
349} // namespace iot::mqtt::server::broker
static std::string toHexString(const std::string &data)
Definition Mqtt.cpp:395
void appear(const std::string &clientId, const std::string &topic, uint8_t qoS)
Definition Broker.cpp:198
void sendPublish(const std::string &clientId, Message &message, uint8_t qoS, bool retain)
Definition Broker.cpp:241
const std::string & getTopic() const
Definition Message.cpp:66
const std::string & getMessage() const
Definition Message.cpp:74
TopicLevel & fromJson(const nlohmann::json &json)
bool subscribe(const std::string &clientId, uint8_t qoS, std::string topic)
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree() const
void publish(Message &message, std::string topic)
TopicLevel(iot::mqtt::server::broker::Broker *broker, const std::string &topicLevel)
std::list< std::string > getSubscriptions(const std::string &absoluteTopicLevel, const std::string &clientId) const
bool unsubscribe(const std::string &clientId, std::string topic)
void appear(const std::string &clientId, const std::string &topic)
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree(const std::string &absoluteTopicLevel) const
std::list< std::string > getSubscriptions(const std::string &clientId) const
void unsubscribe(const std::string &topic, const std::string &clientId)
void fromJson(const nlohmann::json &json)
void unsubscribe(const std::string &clientId)
std::list< std::string > getSubscriptions(const std::string &clientId) const
bool subscribe(const std::string &topic, const std::string &clientId, uint8_t qoS)
void appear(const std::string &clientId)
SubscriptionTree(iot::mqtt::server::broker::Broker *broker)
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree() const