SNode.C
Loading...
Searching...
No Matches
iot::mqtt::server::broker::SubscribtionTree::TopicLevel Class Reference
Collaboration diagram for iot::mqtt::server::broker::SubscribtionTree::TopicLevel:

Public Member Functions

 TopicLevel (iot::mqtt::server::broker::Broker *broker, const std::string &topicLevel)
 
void appear (const std::string &clientId, const std::string &topic)
 
bool subscribe (const std::string &clientId, uint8_t qoS, std::string topic)
 
void publish (Message &message, std::string topic)
 
bool unsubscribe (const std::string &clientId, std::string topic)
 
bool unsubscribe (const std::string &clientId)
 
std::list< std::string > getSubscriptions (const std::string &clientId) const
 
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree () const
 
TopicLevelfromJson (const nlohmann::json &json)
 
nlohmann::json toJson () const
 
void clear ()
 

Private Member Functions

std::list< std::string > getSubscriptions (const std::string &absoluteTopicLevel, const std::string &clientId) const
 
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree (const std::string &absoluteTopicLevel) const
 

Private Attributes

iot::mqtt::server::broker::Brokerbroker
 
std::map< std::string, uint8_t > clientIds
 
std::map< std::string, TopicLeveltopicLevels
 
std::string topicLevel
 

Detailed Description

Definition at line 87 of file SubscribtionTree.h.

Constructor & Destructor Documentation

◆ TopicLevel()

iot::mqtt::server::broker::SubscribtionTree::TopicLevel::TopicLevel ( iot::mqtt::server::broker::Broker broker,
const std::string &  topicLevel 
)
explicit

Definition at line 130 of file SubscribtionTree.cpp.

References topicLevel.

Referenced by clear(), fromJson(), subscribe(), and iot::mqtt::server::broker::SubscribtionTree::SubscribtionTree().

Here is the caller graph for this function:

Member Function Documentation

◆ appear()

void iot::mqtt::server::broker::SubscribtionTree::TopicLevel::appear ( const std::string &  clientId,
const std::string &  topic 
)

Definition at line 135 of file SubscribtionTree.cpp.

135 {
136 if (clientIds.contains(clientId)) {
137 broker->appear(clientId, topic, clientIds[clientId]);
138 }
139
140 for (auto& [topicLevel, subscribtion] : topicLevels) {
141 subscribtion.appear(clientId, std::string(topic).append(topic.empty() ? "" : "/").append(topicLevel));
142 }
143 }
void appear(const std::string &clientId, const std::string &topic, uint8_t qoS)
Definition Broker.cpp:146

References appear(), iot::mqtt::server::broker::Broker::appear(), clientIds, and topicLevels.

Referenced by iot::mqtt::server::broker::SubscribtionTree::appear(), and appear().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ clear()

void iot::mqtt::server::broker::SubscribtionTree::TopicLevel::clear ( )

Definition at line 347 of file SubscribtionTree.cpp.

347 {
348 *this = TopicLevel(broker, "");
349 }
TopicLevel(iot::mqtt::server::broker::Broker *broker, const std::string &topicLevel)

References TopicLevel().

Referenced by iot::mqtt::server::broker::SubscribtionTree::clear().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ fromJson()

SubscribtionTree::TopicLevel & iot::mqtt::server::broker::SubscribtionTree::TopicLevel::fromJson ( const nlohmann::json &  json)

Definition at line 328 of file SubscribtionTree.cpp.

328 {
329 clientIds.clear();
330 topicLevels.clear();
331
332 if (json.contains("qos_map")) {
333 for (const auto& subscriber : json["qos_map"].items()) {
334 clientIds.emplace(subscriber.key(), subscriber.value());
335 }
336 }
337
338 if (json.contains("topic_filter")) {
339 for (const auto& topicLevelItem : json["topic_filter"].items()) {
340 topicLevels.emplace(topicLevelItem.key(), TopicLevel(broker, topicLevelItem.key()).fromJson(topicLevelItem.value()));
341 }
342 }
343
344 return *this;
345 }

References clientIds, fromJson(), TopicLevel(), and topicLevels.

Referenced by iot::mqtt::server::broker::SubscribtionTree::fromJson(), and fromJson().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getSubscriptions() [1/2]

std::list< std::string > iot::mqtt::server::broker::SubscribtionTree::TopicLevel::getSubscriptions ( const std::string &  absoluteTopicLevel,
const std::string &  clientId 
) const
private

Definition at line 291 of file SubscribtionTree.cpp.

292 {
293 std::list<std::string> topicLevelList;
294
295 for (const auto& [topicLevelName, nextTopicLevel] : topicLevels) {
296 const std::string currentAbsoluteTopicLevel = absoluteTopicLevel + topicLevelName;
297
298 if (nextTopicLevel.clientIds.contains(clientId)) {
299 topicLevelList.push_back(currentAbsoluteTopicLevel);
300 }
301
302 topicLevelList.splice(topicLevelList.end(), nextTopicLevel.getSubscriptions(currentAbsoluteTopicLevel + "/", clientId));
303 }
304
305 return topicLevelList;
306 }

References clientIds, getSubscriptions(), and topicLevels.

Referenced by getSubscriptions(), and getSubscriptions().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getSubscriptions() [2/2]

std::list< std::string > iot::mqtt::server::broker::SubscribtionTree::TopicLevel::getSubscriptions ( const std::string &  clientId) const

Definition at line 283 of file SubscribtionTree.cpp.

283 {
284 return getSubscriptions("", clientId);
285 }
std::list< std::string > getSubscriptions(const std::string &clientId) const

References getSubscriptions().

Referenced by iot::mqtt::server::broker::SubscribtionTree::getSubscriptions().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getSubscriptionTree() [1/2]

std::map< std::string, std::list< std::pair< std::string, uint8_t > > > iot::mqtt::server::broker::SubscribtionTree::TopicLevel::getSubscriptionTree ( ) const

Definition at line 287 of file SubscribtionTree.cpp.

287 {
288 return getSubscriptionTree("");
289 }
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree() const

References getSubscriptionTree().

Referenced by iot::mqtt::server::broker::SubscribtionTree::getSubscriptionTree().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getSubscriptionTree() [2/2]

std::map< std::string, std::list< std::pair< std::string, uint8_t > > > iot::mqtt::server::broker::SubscribtionTree::TopicLevel::getSubscriptionTree ( const std::string &  absoluteTopicLevel) const
private

Definition at line 309 of file SubscribtionTree.cpp.

309 {
310 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> topicLevelTree;
311
312 for (const auto& [topicLevelName, nextTopicLevel] : topicLevels) {
313 const std::string composedAbsoluteTopicLevelName = absoluteTopicLevel + topicLevelName;
314
315 for (const auto& clientId : nextTopicLevel.clientIds) {
316 topicLevelTree[composedAbsoluteTopicLevelName].emplace_back(clientId);
317 }
318
319 std::map<std::string, std::list<std::pair<std::string, uint8_t>>> subSubscriptionTree =
320 nextTopicLevel.getSubscriptionTree(composedAbsoluteTopicLevelName + "/");
321
322 topicLevelTree.insert(subSubscriptionTree.begin(), subSubscriptionTree.end());
323 }
324
325 return topicLevelTree;
326 }

References clientIds, getSubscriptionTree(), and topicLevels.

Referenced by getSubscriptionTree(), and getSubscriptionTree().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ publish()

void iot::mqtt::server::broker::SubscribtionTree::TopicLevel::publish ( Message message,
std::string  topic 
)

Definition at line 170 of file SubscribtionTree.cpp.

170 {
171 if (topic.empty()) {
172 LOG(INFO) << "MQTT Broker: Found match:";
173 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "';";
174 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
175
176 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match ...";
177 for (auto& [clientId, clientQoS] : clientIds) {
178 broker->sendPublish(clientId, message, clientQoS, false);
179 }
180 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
181
182 const auto nextHashLevel = topicLevels.find("#");
183 if (nextHashLevel != topicLevels.end()) {
184 LOG(INFO) << "MQTT Broker: Found parent match:";
185 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "'";
186 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
187
188 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match ...";
189 for (auto& [clientId, clientQoS] : nextHashLevel->second.clientIds) {
190 broker->sendPublish(clientId, message, clientQoS, false);
191 }
192 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
193 }
194 } else {
195 const std::string topicLevel = topic.substr(0, topic.find('/'));
196
197 topic.erase(0, topicLevel.size() + 1);
198
199 auto foundNode = topicLevels.find(topicLevel);
200 if (foundNode != topicLevels.end()) {
201 foundNode->second.publish(message, topic);
202 }
203
204 foundNode = topicLevels.find("+");
205 if (foundNode != topicLevels.end()) {
206 foundNode->second.publish(message, topic);
207 }
208
209 foundNode = topicLevels.find("#");
210 if (foundNode != topicLevels.end()) {
211 LOG(INFO) << "MQTT Broker: Found match:";
212 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "'";
213 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
214
215 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match '" << message.getTopic() << "' ...";
216 for (auto& [clientId, clientQoS] : foundNode->second.clientIds) {
217 broker->sendPublish(clientId, message, clientQoS, false);
218 }
219 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
220 }
221 }
222 }
static std::string toHexString(const std::vector< char > &data)
Definition Mqtt.cpp:389
void sendPublish(const std::string &clientId, Message &message, uint8_t qoS, bool retain)
Definition Broker.cpp:237

References clientIds, iot::mqtt::server::broker::Message::getMessage(), iot::mqtt::server::broker::Message::getTopic(), publish(), iot::mqtt::server::broker::Broker::sendPublish(), iot::mqtt::Mqtt::toHexString(), and topicLevels.

Referenced by iot::mqtt::server::broker::SubscribtionTree::publish(), and publish().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ subscribe()

bool iot::mqtt::server::broker::SubscribtionTree::TopicLevel::subscribe ( const std::string &  clientId,
uint8_t  qoS,
std::string  topic 
)

Definition at line 145 of file SubscribtionTree.cpp.

145 {
146 if (topic.empty()) {
147 LOG(INFO) << "MQTT Broker: Subscribe";
148 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
149
150 clientIds[clientId] = qoS;
151 } else {
152 const std::string topicLevel = topic.substr(0, topic.find('/'));
153
154 topic.erase(0, topicLevel.size() + 1);
155
156 const auto& [it, inserted] = topicLevels.insert({topicLevel, SubscribtionTree::TopicLevel(broker, topicLevel)});
157
158 if (!it->second.subscribe(clientId, qoS, topic)) {
159 LOG(DEBUG) << "MQTT Broker: Erase topic: " << topicLevel << " /" << topic;
160
161 topicLevels.erase(it);
162 } else {
163 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel << " /" << topic;
164 }
165 }
166
167 return !topicLevels.empty() || !clientIds.empty();
168 }

References clientIds, subscribe(), TopicLevel(), and topicLevels.

Referenced by subscribe(), and iot::mqtt::server::broker::SubscribtionTree::subscribe().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ toJson()

nlohmann::json iot::mqtt::server::broker::SubscribtionTree::TopicLevel::toJson ( ) const

Definition at line 269 of file SubscribtionTree.cpp.

269 {
270 nlohmann::json json;
271
272 for (const auto& [topicLevelName, topicLevel] : topicLevels) {
273 json["topic_filter"][topicLevelName] = topicLevel.toJson();
274 }
275
276 for (const auto& [subscriber, qoS] : clientIds) {
277 json["qos_map"][subscriber] = qoS;
278 }
279
280 return json;
281 }

References clientIds, toJson(), and topicLevels.

Referenced by iot::mqtt::server::broker::SubscribtionTree::toJson(), and toJson().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ unsubscribe() [1/2]

bool iot::mqtt::server::broker::SubscribtionTree::TopicLevel::unsubscribe ( const std::string &  clientId)

Definition at line 249 of file SubscribtionTree.cpp.

249 {
250 if (clientIds.erase(clientId) != 0) {
251 LOG(INFO) << "MQTT Broker: Unsubscribe";
252 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
253 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel;
254 }
255
256 for (auto it = topicLevels.begin(); it != topicLevels.end();) {
257 if (it->second.unsubscribe(clientId)) {
258 LOG(DEBUG) << "MQTT Broker: Erase Topic: " << it->first;
259
260 it = topicLevels.erase(it);
261 } else {
262 ++it;
263 }
264 }
265
266 return clientIds.empty() && topicLevels.empty();
267 }

References clientIds, topicLevel, topicLevels, and unsubscribe().

Referenced by iot::mqtt::server::broker::SubscribtionTree::unsubscribe(), and unsubscribe().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ unsubscribe() [2/2]

bool iot::mqtt::server::broker::SubscribtionTree::TopicLevel::unsubscribe ( const std::string &  clientId,
std::string  topic 
)

Definition at line 224 of file SubscribtionTree.cpp.

224 {
225 if (topic.empty()) {
226 if (clientIds.erase(clientId) != 0) {
227 LOG(INFO) << "MQTT Broker: Unsubscribe";
228 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
229 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel;
230 }
231 } else {
232 const std::string topicLevel = topic.substr(0, topic.find('/'));
233
234 auto&& it = topicLevels.find(topicLevel);
235 if (it != topicLevels.end()) {
236 topic.erase(0, topicLevel.size() + 1);
237
238 if (it->second.unsubscribe(clientId, topic)) {
239 LOG(DEBUG) << "MQTT Broker: Erase Topic: " << it->first;
240
241 topicLevels.erase(it);
242 }
243 }
244 }
245
246 return clientIds.empty() && topicLevels.empty();
247 }

References clientIds, topicLevel, topicLevels, and unsubscribe().

Referenced by unsubscribe(), and iot::mqtt::server::broker::SubscribtionTree::unsubscribe().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ broker

iot::mqtt::server::broker::Broker* iot::mqtt::server::broker::SubscribtionTree::TopicLevel::broker
private

Definition at line 114 of file SubscribtionTree.h.

◆ clientIds

std::map<std::string, uint8_t> iot::mqtt::server::broker::SubscribtionTree::TopicLevel::clientIds
private

◆ topicLevel

std::string iot::mqtt::server::broker::SubscribtionTree::TopicLevel::topicLevel
private

Definition at line 119 of file SubscribtionTree.h.

Referenced by TopicLevel(), unsubscribe(), and unsubscribe().

◆ topicLevels

std::map<std::string, TopicLevel> iot::mqtt::server::broker::SubscribtionTree::TopicLevel::topicLevels
private

The documentation for this class was generated from the following files: