51 auto hashCount = std::ranges::count(topic,
'#');
52 if (!topic.empty() && (hashCount == 0 || (hashCount == 1 && topic.ends_with(
'#')))) {
53 head.subscribe(clientId, qoS, topic);
57 LOG(ERROR) <<
"MQTT Broker: Subscribe: Wrong '#' placement: " << topic;
91 if (clientIds.contains(clientId)) {
92 broker->appear(clientId, topic, clientIds[clientId]);
95 for (
auto& [topicLevel, subscribtion] : topicLevels) {
96 subscribtion.appear(clientId, std::string(topic).append(topic.empty() ?
"" :
"/").append(topicLevel));
102 LOG(INFO) <<
"MQTT Broker: Subscribe";
103 LOG(INFO) <<
"MQTT Broker: ClientId: " << clientId;
105 clientIds[clientId] = qoS;
107 const std::string topicLevel = topic.substr(0, topic.find(
'/'));
109 topic.erase(0, topicLevel.size() + 1);
111 const auto& [it, inserted] = topicLevels.insert({topicLevel, SubscribtionTree::TopicLevel(broker, topicLevel)});
113 if (!it->second.subscribe(clientId, qoS, topic)) {
114 LOG(DEBUG) <<
"MQTT Broker: Erase topic: " << topicLevel <<
" /" << topic;
116 topicLevels.erase(it);
118 LOG(INFO) <<
"MQTT Broker: Topic: " << topicLevel <<
" /" << topic;
122 return !topicLevels.empty() || !clientIds.empty();
127 LOG(INFO) <<
"MQTT Broker: Found match:";
128 LOG(INFO) <<
"MQTT Broker: Topic: '" << message.getTopic() <<
"';";
129 LOG(INFO) <<
"MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
131 LOG(INFO) <<
"MQTT Broker: Distribute PUBLISH for match ...";
132 for (
auto& [clientId, clientQoS] : clientIds) {
133 broker->sendPublish(clientId, message, clientQoS,
false);
135 LOG(INFO) <<
"MQTT Broker: ... distributing PUBLISH for match completed";
137 const auto nextHashLevel = topicLevels.find(
"#");
138 if (nextHashLevel != topicLevels.end()) {
139 LOG(INFO) <<
"MQTT Broker: Found parent match:";
140 LOG(INFO) <<
"MQTT Broker: Topic: '" << message.getTopic() <<
"'";
141 LOG(INFO) <<
"MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
143 LOG(INFO) <<
"MQTT Broker: Distribute PUBLISH for match ...";
144 for (
auto& [clientId, clientQoS] : nextHashLevel->second.clientIds) {
145 broker->sendPublish(clientId, message, clientQoS,
false);
147 LOG(INFO) <<
"MQTT Broker: ... distributing PUBLISH for match completed";
150 const std::string topicLevel = topic.substr(0, topic.find(
'/'));
152 topic.erase(0, topicLevel.size() + 1);
154 auto foundNode = topicLevels.find(topicLevel);
155 if (foundNode != topicLevels.end()) {
156 foundNode->second.publish(message, topic);
159 foundNode = topicLevels.find(
"+");
160 if (foundNode != topicLevels.end()) {
161 foundNode->second.publish(message, topic);
164 foundNode = topicLevels.find(
"#");
165 if (foundNode != topicLevels.end()) {
166 LOG(INFO) <<
"MQTT Broker: Found match:";
167 LOG(INFO) <<
"MQTT Broker: Topic: '" << message.getTopic() <<
"'";
168 LOG(INFO) <<
"MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
170 LOG(INFO) <<
"MQTT Broker: Distribute PUBLISH for match '" << message.getTopic() <<
"' ...";
171 for (
auto& [clientId, clientQoS] : foundNode->second.clientIds) {
172 broker->sendPublish(clientId, message, clientQoS,
false);
174 LOG(INFO) <<
"MQTT Broker: ... distributing PUBLISH for match completed";
181 if (clientIds.erase(clientId) != 0) {
182 LOG(INFO) <<
"MQTT Broker: Unsubscribe";
183 LOG(INFO) <<
"MQTT Broker: ClientId: " << clientId;
184 LOG(INFO) <<
"MQTT Broker: Topic: " << topicLevel;
187 const std::string topicLevel = topic.substr(0, topic.find(
'/'));
189 auto&& it = topicLevels.find(topicLevel);
190 if (it != topicLevels.end()) {
191 topic.erase(0, topicLevel.size() + 1);
193 if (it->second.unsubscribe(clientId, topic)) {
194 LOG(DEBUG) <<
"MQTT Broker: Erase Topic: " << it->first;
196 topicLevels.erase(it);
201 return clientIds.empty() && topicLevels.empty();
205 if (clientIds.erase(clientId) != 0) {
206 LOG(INFO) <<
"MQTT Broker: Unsubscribe";
207 LOG(INFO) <<
"MQTT Broker: ClientId: " << clientId;
208 LOG(INFO) <<
"MQTT Broker: Topic: " << topicLevel;
211 for (
auto it = topicLevels.begin(); it != topicLevels.end();) {
212 if (it->second.unsubscribe(clientId)) {
213 LOG(DEBUG) <<
"MQTT Broker: Erase Topic: " << it->first;
215 it = topicLevels.erase(it);
221 return clientIds.empty() && topicLevels.empty();