SNode.C
Loading...
Searching...
No Matches
iot::mqtt::server::broker::SubscriptionTree::TopicLevel Class Reference
Collaboration diagram for iot::mqtt::server::broker::SubscriptionTree::TopicLevel:

Public Member Functions

 TopicLevel (iot::mqtt::server::broker::Broker *broker, const std::string &topicLevel)
void appear (const std::string &clientId, const std::string &topic)
bool subscribe (const std::string &clientId, uint8_t qoS, std::string topic)
void publish (Message &message, std::string topic)
bool unsubscribe (const std::string &clientId, std::string topic)
bool unsubscribe (const std::string &clientId)
std::list< std::string > getSubscriptions (const std::string &clientId) const
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree () const
TopicLevelfromJson (const nlohmann::json &json)
nlohmann::json toJson () const
void clear ()

Private Member Functions

std::list< std::string > getSubscriptions (const std::string &absoluteTopicLevel, const std::string &clientId) const
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree (const std::string &absoluteTopicLevel) const

Private Attributes

iot::mqtt::server::broker::Brokerbroker
std::map< std::string, uint8_t > clientIds
std::map< std::string, TopicLeveltopicLevels
std::string topicLevel

Detailed Description

Definition at line 87 of file SubscriptionTree.h.

Constructor & Destructor Documentation

◆ TopicLevel()

iot::mqtt::server::broker::SubscriptionTree::TopicLevel::TopicLevel ( iot::mqtt::server::broker::Broker * broker,
const std::string & topicLevel )
explicit

Definition at line 128 of file SubscriptionTree.cpp.

References topicLevel.

Referenced by clear(), fromJson(), subscribe(), and iot::mqtt::server::broker::SubscriptionTree::SubscriptionTree().

Here is the caller graph for this function:

Member Function Documentation

◆ appear()

void iot::mqtt::server::broker::SubscriptionTree::TopicLevel::appear ( const std::string & clientId,
const std::string & topic )

Definition at line 133 of file SubscriptionTree.cpp.

133 {
134 if (clientIds.contains(clientId)) {
135 broker->appear(clientId, topic, clientIds[clientId]);
136 }
137
138 for (auto& [topicLevel, subscription] : topicLevels) {
139 subscription.appear(clientId, std::string(topic).append(topic.empty() ? "" : "/").append(topicLevel));
140 }
141 }

References iot::mqtt::server::broker::Broker::appear(), appear(), clientIds, and topicLevels.

Referenced by iot::mqtt::server::broker::SubscriptionTree::appear(), and appear().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ clear()

void iot::mqtt::server::broker::SubscriptionTree::TopicLevel::clear ( )

Definition at line 345 of file SubscriptionTree.cpp.

345 {
346 *this = TopicLevel(broker, "");
347 }
TopicLevel(iot::mqtt::server::broker::Broker *broker, const std::string &topicLevel)

References TopicLevel().

Referenced by iot::mqtt::server::broker::SubscriptionTree::clear().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ fromJson()

SubscriptionTree::TopicLevel & iot::mqtt::server::broker::SubscriptionTree::TopicLevel::fromJson ( const nlohmann::json & json)

Definition at line 326 of file SubscriptionTree.cpp.

326 {
327 clientIds.clear();
328 topicLevels.clear();
329
330 if (json.contains("qos_map")) {
331 for (const auto& subscriber : json["qos_map"].items()) {
332 clientIds.emplace(subscriber.key(), subscriber.value());
333 }
334 }
335
336 if (json.contains("topic_filter")) {
337 for (const auto& topicLevelItem : json["topic_filter"].items()) {
338 topicLevels.emplace(topicLevelItem.key(), TopicLevel(broker, topicLevelItem.key()).fromJson(topicLevelItem.value()));
339 }
340 }
341
342 return *this;
343 }

References clientIds, fromJson(), TopicLevel(), and topicLevels.

Referenced by iot::mqtt::server::broker::SubscriptionTree::fromJson(), and fromJson().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getSubscriptions() [1/2]

std::list< std::string > iot::mqtt::server::broker::SubscriptionTree::TopicLevel::getSubscriptions ( const std::string & absoluteTopicLevel,
const std::string & clientId ) const
private

Definition at line 289 of file SubscriptionTree.cpp.

290 {
291 std::list<std::string> topicLevelList;
292
293 for (const auto& [topicLevelName, nextTopicLevel] : topicLevels) {
294 const std::string currentAbsoluteTopicLevel = absoluteTopicLevel + topicLevelName;
295
296 if (nextTopicLevel.clientIds.contains(clientId)) {
297 topicLevelList.push_back(currentAbsoluteTopicLevel);
298 }
299
300 topicLevelList.splice(topicLevelList.end(), nextTopicLevel.getSubscriptions(currentAbsoluteTopicLevel + "/", clientId));
301 }
302
303 return topicLevelList;
304 }

References clientIds, getSubscriptions(), and topicLevels.

Referenced by getSubscriptions(), and getSubscriptions().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getSubscriptions() [2/2]

std::list< std::string > iot::mqtt::server::broker::SubscriptionTree::TopicLevel::getSubscriptions ( const std::string & clientId) const

Definition at line 281 of file SubscriptionTree.cpp.

281 {
282 return getSubscriptions("", clientId);
283 }
std::list< std::string > getSubscriptions(const std::string &clientId) const

References getSubscriptions().

Referenced by iot::mqtt::server::broker::SubscriptionTree::getSubscriptions().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getSubscriptionTree() [1/2]

std::map< std::string, std::list< std::pair< std::string, uint8_t > > > iot::mqtt::server::broker::SubscriptionTree::TopicLevel::getSubscriptionTree ( ) const

Definition at line 285 of file SubscriptionTree.cpp.

285 {
286 return getSubscriptionTree("");
287 }
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree() const

References getSubscriptionTree().

Referenced by iot::mqtt::server::broker::SubscriptionTree::getSubscriptionTree().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getSubscriptionTree() [2/2]

std::map< std::string, std::list< std::pair< std::string, uint8_t > > > iot::mqtt::server::broker::SubscriptionTree::TopicLevel::getSubscriptionTree ( const std::string & absoluteTopicLevel) const
private

Definition at line 307 of file SubscriptionTree.cpp.

307 {
308 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> topicLevelTree;
309
310 for (const auto& [topicLevelName, nextTopicLevel] : topicLevels) {
311 const std::string composedAbsoluteTopicLevelName = absoluteTopicLevel + topicLevelName;
312
313 for (const auto& clientId : nextTopicLevel.clientIds) {
314 topicLevelTree[composedAbsoluteTopicLevelName].emplace_back(clientId);
315 }
316
317 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> subSubscriptionTree =
318 nextTopicLevel.getSubscriptionTree(composedAbsoluteTopicLevelName + "/");
319
320 topicLevelTree.insert(subSubscriptionTree.begin(), subSubscriptionTree.end());
321 }
322
323 return topicLevelTree;
324 }

References clientIds, getSubscriptionTree(), and topicLevels.

Referenced by getSubscriptionTree(), and getSubscriptionTree().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ publish()

void iot::mqtt::server::broker::SubscriptionTree::TopicLevel::publish ( Message & message,
std::string topic )

Definition at line 168 of file SubscriptionTree.cpp.

168 {
169 if (topic.empty()) {
170 LOG(INFO) << "MQTT Broker: Found match:";
171 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "';";
172 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
173
174 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match ...";
175 for (auto& [clientId, clientQoS] : clientIds) {
176 broker->sendPublish(clientId, message, clientQoS, false);
177 }
178 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
179
180 const auto nextHashLevel = topicLevels.find("#");
181 if (nextHashLevel != topicLevels.end()) {
182 LOG(INFO) << "MQTT Broker: Found parent match:";
183 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "'";
184 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
185
186 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match ...";
187 for (auto& [clientId, clientQoS] : nextHashLevel->second.clientIds) {
188 broker->sendPublish(clientId, message, clientQoS, false);
189 }
190 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
191 }
192 } else {
193 const std::string topicLevel = topic.substr(0, topic.find('/'));
194
195 topic.erase(0, topicLevel.size() + 1);
196
197 auto foundNode = topicLevels.find(topicLevel);
198 if (foundNode != topicLevels.end()) {
199 foundNode->second.publish(message, topic);
200 }
201
202 foundNode = topicLevels.find("+");
203 if (foundNode != topicLevels.end()) {
204 foundNode->second.publish(message, topic);
205 }
206
207 foundNode = topicLevels.find("#");
208 if (foundNode != topicLevels.end()) {
209 LOG(INFO) << "MQTT Broker: Found match:";
210 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "'";
211 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
212
213 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match '" << message.getTopic() << "' ...";
214 for (auto& [clientId, clientQoS] : foundNode->second.clientIds) {
215 broker->sendPublish(clientId, message, clientQoS, false);
216 }
217 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
218 }
219 }
220 }
static std::string toHexString(const std::vector< char > &data)
Definition Mqtt.cpp:390

References clientIds, iot::mqtt::server::broker::Message::getMessage(), iot::mqtt::server::broker::Message::getTopic(), publish(), iot::mqtt::server::broker::Broker::sendPublish(), iot::mqtt::Mqtt::toHexString(), and topicLevels.

Referenced by iot::mqtt::server::broker::SubscriptionTree::publish(), and publish().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ subscribe()

bool iot::mqtt::server::broker::SubscriptionTree::TopicLevel::subscribe ( const std::string & clientId,
uint8_t qoS,
std::string topic )

Definition at line 143 of file SubscriptionTree.cpp.

143 {
144 if (topic.empty()) {
145 LOG(INFO) << "MQTT Broker: Subscribe";
146 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
147
148 clientIds[clientId] = qoS;
149 } else {
150 const std::string topicLevel = topic.substr(0, topic.find('/'));
151
152 topic.erase(0, topicLevel.size() + 1);
153
154 const auto& [it, inserted] = topicLevels.insert({topicLevel, SubscriptionTree::TopicLevel(broker, topicLevel)});
155
156 if (!it->second.subscribe(clientId, qoS, topic)) {
157 LOG(DEBUG) << "MQTT Broker: Erase topic: " << topicLevel << " /" << topic;
158
159 topicLevels.erase(it);
160 } else {
161 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel << " /" << topic;
162 }
163 }
164
165 return !topicLevels.empty() || !clientIds.empty();
166 }

References clientIds, subscribe(), TopicLevel(), and topicLevels.

Referenced by iot::mqtt::server::broker::SubscriptionTree::subscribe(), and subscribe().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ toJson()

nlohmann::json iot::mqtt::server::broker::SubscriptionTree::TopicLevel::toJson ( ) const

Definition at line 267 of file SubscriptionTree.cpp.

267 {
268 nlohmann::json json;
269
270 for (const auto& [topicLevelName, topicLevel] : topicLevels) {
271 json["topic_filter"][topicLevelName] = topicLevel.toJson();
272 }
273
274 for (const auto& [subscriber, qoS] : clientIds) {
275 json["qos_map"][subscriber] = qoS;
276 }
277
278 return json;
279 }

References clientIds, toJson(), and topicLevels.

Referenced by iot::mqtt::server::broker::SubscriptionTree::toJson(), and toJson().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ unsubscribe() [1/2]

bool iot::mqtt::server::broker::SubscriptionTree::TopicLevel::unsubscribe ( const std::string & clientId)

Definition at line 247 of file SubscriptionTree.cpp.

247 {
248 if (clientIds.erase(clientId) != 0) {
249 LOG(INFO) << "MQTT Broker: Unsubscribe";
250 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
251 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel;
252 }
253
254 for (auto it = topicLevels.begin(); it != topicLevels.end();) {
255 if (it->second.unsubscribe(clientId)) {
256 LOG(DEBUG) << "MQTT Broker: Erase Topic: " << it->first;
257
258 it = topicLevels.erase(it);
259 } else {
260 ++it;
261 }
262 }
263
264 return clientIds.empty() && topicLevels.empty();
265 }

References clientIds, topicLevel, topicLevels, and unsubscribe().

Referenced by unsubscribe(), and iot::mqtt::server::broker::SubscriptionTree::unsubscribe().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ unsubscribe() [2/2]

bool iot::mqtt::server::broker::SubscriptionTree::TopicLevel::unsubscribe ( const std::string & clientId,
std::string topic )

Definition at line 222 of file SubscriptionTree.cpp.

222 {
223 if (topic.empty()) {
224 if (clientIds.erase(clientId) != 0) {
225 LOG(INFO) << "MQTT Broker: Unsubscribe";
226 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
227 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel;
228 }
229 } else {
230 const std::string topicLevel = topic.substr(0, topic.find('/'));
231
232 auto&& it = topicLevels.find(topicLevel);
233 if (it != topicLevels.end()) {
234 topic.erase(0, topicLevel.size() + 1);
235
236 if (it->second.unsubscribe(clientId, topic)) {
237 LOG(DEBUG) << "MQTT Broker: Erase Topic: " << it->first;
238
239 topicLevels.erase(it);
240 }
241 }
242 }
243
244 return clientIds.empty() && topicLevels.empty();
245 }

References clientIds, topicLevel, topicLevels, and unsubscribe().

Referenced by unsubscribe(), and iot::mqtt::server::broker::SubscriptionTree::unsubscribe().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ broker

iot::mqtt::server::broker::Broker* iot::mqtt::server::broker::SubscriptionTree::TopicLevel::broker
private

Definition at line 114 of file SubscriptionTree.h.

◆ clientIds

std::map<std::string, uint8_t> iot::mqtt::server::broker::SubscriptionTree::TopicLevel::clientIds
private

◆ topicLevel

std::string iot::mqtt::server::broker::SubscriptionTree::TopicLevel::topicLevel
private

Definition at line 119 of file SubscriptionTree.h.

Referenced by TopicLevel(), unsubscribe(), and unsubscribe().

◆ topicLevels

std::map<std::string, TopicLevel> iot::mqtt::server::broker::SubscriptionTree::TopicLevel::topicLevels
private

The documentation for this class was generated from the following files: