2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
42#include "iot/mqtt/server/broker/SubscribtionTree.h"
44#include "iot/mqtt/Mqtt.h"
45#include "iot/mqtt/server/broker/Broker.h"
47#ifndef DOXYGEN_SHOULD_SKIP_THIS
49#include "log/Logger.h"
52#include <nlohmann/json.hpp>
59namespace iot::mqtt::server::
broker {
72 auto hashCount = std::ranges::count(topic,
'#');
73 if (!topic.empty() && (hashCount == 0 || (hashCount == 1 && topic.ends_with(
'#')))) {
78 LOG(ERROR) <<
"MQTT Broker: Subscribe: Wrong '#' placement: " << topic;
85 auto hashCount = std::ranges::count(message
.getTopic(),
'#');
86 if (!message
.getTopic().empty() && (hashCount == 0 || (hashCount == 1 && message
.getTopic().ends_with(
'#')))) {
89 LOG(ERROR) <<
"MQTT Broker: Publish: Wrong '#' placement: " << message
.getTopic();
94 auto hashCount = std::ranges::count(topic,
'#');
95 if (!topic.empty() && (hashCount == 0 || (hashCount == 1 && topic.ends_with(
'#')))) {
98 LOG(ERROR) <<
"MQTT Broker: Unsubscribe: Wrong '#' placement: " << topic;
109 return subscriptions;
140 for (
auto& [topicLevel, subscribtion] :
topicLevels) {
141 subscribtion
.appear(clientId
, std::string(topic).append(topic.empty() ?
"" :
"/").append(topicLevel)
);
147 LOG(INFO) <<
"MQTT Broker: Subscribe";
148 LOG(INFO) <<
"MQTT Broker: ClientId: " << clientId;
152 const std::string topicLevel = topic.substr(0, topic.find(
'/'));
154 topic.erase(0, topicLevel.size() + 1);
159 LOG(DEBUG) <<
"MQTT Broker: Erase topic: " << topicLevel <<
" /" << topic;
163 LOG(INFO) <<
"MQTT Broker: Topic: " << topicLevel <<
" /" << topic;
172 LOG(INFO) <<
"MQTT Broker: Found match:";
173 LOG(INFO) <<
"MQTT Broker: Topic: '" << message
.getTopic() <<
"';";
176 LOG(INFO) <<
"MQTT Broker: Distribute PUBLISH for match ...";
177 for (
auto& [clientId, clientQoS] :
clientIds) {
180 LOG(INFO) <<
"MQTT Broker: ... distributing PUBLISH for match completed";
184 LOG(INFO) <<
"MQTT Broker: Found parent match:";
185 LOG(INFO) <<
"MQTT Broker: Topic: '" << message
.getTopic() <<
"'";
188 LOG(INFO) <<
"MQTT Broker: Distribute PUBLISH for match ...";
189 for (
auto& [clientId, clientQoS] : nextHashLevel->second
.clientIds) {
192 LOG(INFO) <<
"MQTT Broker: ... distributing PUBLISH for match completed";
195 const std::string topicLevel = topic.substr(0, topic.find(
'/'));
197 topic.erase(0, topicLevel.size() + 1);
211 LOG(INFO) <<
"MQTT Broker: Found match:";
212 LOG(INFO) <<
"MQTT Broker: Topic: '" << message
.getTopic() <<
"'";
215 LOG(INFO) <<
"MQTT Broker: Distribute PUBLISH for match '" << message
.getTopic() <<
"' ...";
216 for (
auto& [clientId, clientQoS] : foundNode->second
.clientIds) {
219 LOG(INFO) <<
"MQTT Broker: ... distributing PUBLISH for match completed";
227 LOG(INFO) <<
"MQTT Broker: Unsubscribe";
228 LOG(INFO) <<
"MQTT Broker: ClientId: " << clientId;
229 LOG(INFO) <<
"MQTT Broker: Topic: " <<
topicLevel;
232 const std::string topicLevel = topic.substr(0, topic.find(
'/'));
236 topic.erase(0, topicLevel.size() + 1);
239 LOG(DEBUG) <<
"MQTT Broker: Erase Topic: " << it->first;
251 LOG(INFO) <<
"MQTT Broker: Unsubscribe";
252 LOG(INFO) <<
"MQTT Broker: ClientId: " << clientId;
253 LOG(INFO) <<
"MQTT Broker: Topic: " <<
topicLevel;
258 LOG(DEBUG) <<
"MQTT Broker: Erase Topic: " << it->first;
272 for (
const auto& [topicLevelName, topicLevel] :
topicLevels) {
273 json[
"topic_filter"][topicLevelName] = topicLevel
.toJson();
276 for (
const auto& [subscriber, qoS] :
clientIds) {
277 json[
"qos_map"][subscriber] = qoS;
292 const std::string& clientId)
const {
293 std::list<std::string> topicLevelList;
295 for (
const auto& [topicLevelName, nextTopicLevel] :
topicLevels) {
296 const std::string currentAbsoluteTopicLevel = absoluteTopicLevel + topicLevelName;
298 if (nextTopicLevel
.clientIds.contains(clientId)) {
299 topicLevelList.push_back(currentAbsoluteTopicLevel);
302 topicLevelList.splice(topicLevelList.end(), nextTopicLevel
.getSubscriptions(currentAbsoluteTopicLevel +
"/", clientId
));
305 return topicLevelList;
308 std::map<std::string, std::list<std::pair<std::string, uint8_t>>>
310 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> topicLevelTree;
312 for (
const auto& [topicLevelName, nextTopicLevel] :
topicLevels) {
313 const std::string composedAbsoluteTopicLevelName = absoluteTopicLevel + topicLevelName;
315 for (
const auto& clientId : nextTopicLevel
.clientIds) {
316 topicLevelTree[composedAbsoluteTopicLevelName].emplace_back(clientId);
319 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> subSubscriptionTree =
322 topicLevelTree.insert(subSubscriptionTree.begin(), subSubscriptionTree.end());
325 return topicLevelTree;
332 if (json.contains(
"qos_map")) {
333 for (
const auto& subscriber : json[
"qos_map"].items()) {
334 clientIds.emplace(subscriber.key(), subscriber.value());
338 if (json.contains(
"topic_filter")) {
339 for (
const auto& topicLevelItem : json[
"topic_filter"].items()) {
static std::string toHexString(const std::string &data)
void appear(const std::string &clientId, const std::string &topic, uint8_t qoS)
void sendPublish(const std::string &clientId, Message &message, uint8_t qoS, bool retain)
const std::string & getTopic() const
const std::string & getMessage() const
std::map< std::string, TopicLevel > topicLevels
nlohmann::json toJson() const
void appear(const std::string &clientId, const std::string &topic)
void publish(Message &message, std::string topic)
TopicLevel & fromJson(const nlohmann::json &json)
std::map< std::string, uint8_t > clientIds
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree(const std::string &absoluteTopicLevel) const
TopicLevel(iot::mqtt::server::broker::Broker *broker, const std::string &topicLevel)
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree() const
bool unsubscribe(const std::string &clientId, std::string topic)
bool unsubscribe(const std::string &clientId)
std::list< std::string > getSubscriptions(const std::string &clientId) const
bool subscribe(const std::string &clientId, uint8_t qoS, std::string topic)
std::list< std::string > getSubscriptions(const std::string &absoluteTopicLevel, const std::string &clientId) const
nlohmann::json toJson() const
std::list< std::string > getSubscriptions(const std::string &clientId) const
void fromJson(const nlohmann::json &json)
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree() const
void publish(Message &&message)
void unsubscribe(const std::string &clientId)
void appear(const std::string &clientId)
void unsubscribe(const std::string &topic, const std::string &clientId)
SubscribtionTree(iot::mqtt::server::broker::Broker *broker)
bool subscribe(const std::string &topic, const std::string &clientId, uint8_t qoS)