SNode.C
Loading...
Searching...
No Matches
iot::mqtt::server::broker::SubscribtionTree::TopicLevel Class Reference
Collaboration diagram for iot::mqtt::server::broker::SubscribtionTree::TopicLevel:

Public Member Functions

 TopicLevel (iot::mqtt::server::broker::Broker *broker, const std::string &topicLevel)
 
void appear (const std::string &clientId, const std::string &topic)
 
bool subscribe (const std::string &clientId, uint8_t qoS, std::string topic)
 
void publish (Message &message, std::string topic)
 
bool unsubscribe (const std::string &clientId, std::string topic)
 
bool unsubscribe (const std::string &clientId)
 
TopicLevelfromJson (const nlohmann::json &json)
 
nlohmann::json toJson () const
 
void clear ()
 

Private Attributes

std::map< std::string, uint8_t > clientIds
 
std::map< std::string, TopicLeveltopicLevels
 
iot::mqtt::server::broker::Brokerbroker
 
std::string topicLevel
 

Detailed Description

Definition at line 58 of file SubscribtionTree.h.

Constructor & Destructor Documentation

◆ TopicLevel()

iot::mqtt::server::broker::SubscribtionTree::TopicLevel::TopicLevel ( iot::mqtt::server::broker::Broker * broker,
const std::string & topicLevel )
explicit

Member Function Documentation

◆ appear()

void iot::mqtt::server::broker::SubscribtionTree::TopicLevel::appear ( const std::string & clientId,
const std::string & topic )

Definition at line 90 of file SubscribtionTree.cpp.

90 {
91 if (clientIds.contains(clientId)) {
92 broker->appear(clientId, topic, clientIds[clientId]);
93 }
94
95 for (auto& [topicLevel, subscribtion] : topicLevels) {
96 subscribtion.appear(clientId, std::string(topic).append(topic.empty() ? "" : "/").append(topicLevel));
97 }
98 }
void appear(const std::string &clientId, const std::string &topic, uint8_t qoS)
Definition Broker.cpp:125

◆ clear()

void iot::mqtt::server::broker::SubscribtionTree::TopicLevel::clear ( )

Definition at line 257 of file SubscribtionTree.cpp.

257 {
258 *this = TopicLevel(broker, "");
259 }
TopicLevel(iot::mqtt::server::broker::Broker *broker, const std::string &topicLevel)

◆ fromJson()

SubscribtionTree::TopicLevel & iot::mqtt::server::broker::SubscribtionTree::TopicLevel::fromJson ( const nlohmann::json & json)

Definition at line 238 of file SubscribtionTree.cpp.

238 {
239 clientIds.clear();
240 topicLevels.clear();
241
242 if (json.contains("qos_map")) {
243 for (const auto& subscriber : json["qos_map"].items()) {
244 clientIds.emplace(subscriber.key(), subscriber.value());
245 }
246 }
247
248 if (json.contains("topic_filter")) {
249 for (const auto& topicLevelItem : json["topic_filter"].items()) {
250 topicLevels.emplace(topicLevelItem.key(), TopicLevel(broker, topicLevelItem.key()).fromJson(topicLevelItem.value()));
251 }
252 }
253
254 return *this;
255 }

◆ publish()

void iot::mqtt::server::broker::SubscribtionTree::TopicLevel::publish ( Message & message,
std::string topic )

Definition at line 125 of file SubscribtionTree.cpp.

125 {
126 if (topic.empty()) {
127 LOG(INFO) << "MQTT Broker: Found match:";
128 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "';";
129 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
130
131 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match ...";
132 for (auto& [clientId, clientQoS] : clientIds) {
133 broker->sendPublish(clientId, message, clientQoS, false);
134 }
135 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
136
137 const auto nextHashLevel = topicLevels.find("#");
138 if (nextHashLevel != topicLevels.end()) {
139 LOG(INFO) << "MQTT Broker: Found parent match:";
140 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "'";
141 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
142
143 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match ...";
144 for (auto& [clientId, clientQoS] : nextHashLevel->second.clientIds) {
145 broker->sendPublish(clientId, message, clientQoS, false);
146 }
147 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
148 }
149 } else {
150 const std::string topicLevel = topic.substr(0, topic.find('/'));
151
152 topic.erase(0, topicLevel.size() + 1);
153
154 auto foundNode = topicLevels.find(topicLevel);
155 if (foundNode != topicLevels.end()) {
156 foundNode->second.publish(message, topic);
157 }
158
159 foundNode = topicLevels.find("+");
160 if (foundNode != topicLevels.end()) {
161 foundNode->second.publish(message, topic);
162 }
163
164 foundNode = topicLevels.find("#");
165 if (foundNode != topicLevels.end()) {
166 LOG(INFO) << "MQTT Broker: Found match:";
167 LOG(INFO) << "MQTT Broker: Topic: '" << message.getTopic() << "'";
168 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
169
170 LOG(INFO) << "MQTT Broker: Distribute PUBLISH for match '" << message.getTopic() << "' ...";
171 for (auto& [clientId, clientQoS] : foundNode->second.clientIds) {
172 broker->sendPublish(clientId, message, clientQoS, false);
173 }
174 LOG(INFO) << "MQTT Broker: ... distributing PUBLISH for match completed";
175 }
176 }
177 }
static std::string toHexString(const std::vector< char > &data)
Definition Mqtt.cpp:367
void sendPublish(const std::string &clientId, Message &message, uint8_t qoS, bool retain)
Definition Broker.cpp:204

◆ subscribe()

bool iot::mqtt::server::broker::SubscribtionTree::TopicLevel::subscribe ( const std::string & clientId,
uint8_t qoS,
std::string topic )

Definition at line 100 of file SubscribtionTree.cpp.

100 {
101 if (topic.empty()) {
102 LOG(INFO) << "MQTT Broker: Subscribe";
103 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
104
105 clientIds[clientId] = qoS;
106 } else {
107 const std::string topicLevel = topic.substr(0, topic.find('/'));
108
109 topic.erase(0, topicLevel.size() + 1);
110
111 const auto& [it, inserted] = topicLevels.insert({topicLevel, SubscribtionTree::TopicLevel(broker, topicLevel)});
112
113 if (!it->second.subscribe(clientId, qoS, topic)) {
114 LOG(DEBUG) << "MQTT Broker: Erase topic: " << topicLevel << " /" << topic;
115
116 topicLevels.erase(it);
117 } else {
118 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel << " /" << topic;
119 }
120 }
121
122 return !topicLevels.empty() || !clientIds.empty();
123 }

◆ toJson()

nlohmann::json iot::mqtt::server::broker::SubscribtionTree::TopicLevel::toJson ( ) const

Definition at line 224 of file SubscribtionTree.cpp.

224 {
225 nlohmann::json json;
226
227 for (const auto& [topicLevelName, topicLevel] : topicLevels) {
228 json["topic_filter"][topicLevelName] = topicLevel.toJson();
229 }
230
231 for (const auto& [subscriber, qoS] : clientIds) {
232 json["qos_map"][subscriber] = qoS;
233 }
234
235 return json;
236 }

◆ unsubscribe() [1/2]

bool iot::mqtt::server::broker::SubscribtionTree::TopicLevel::unsubscribe ( const std::string & clientId)

Definition at line 204 of file SubscribtionTree.cpp.

204 {
205 if (clientIds.erase(clientId) != 0) {
206 LOG(INFO) << "MQTT Broker: Unsubscribe";
207 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
208 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel;
209 }
210
211 for (auto it = topicLevels.begin(); it != topicLevels.end();) {
212 if (it->second.unsubscribe(clientId)) {
213 LOG(DEBUG) << "MQTT Broker: Erase Topic: " << it->first;
214
215 it = topicLevels.erase(it);
216 } else {
217 ++it;
218 }
219 }
220
221 return clientIds.empty() && topicLevels.empty();
222 }

◆ unsubscribe() [2/2]

bool iot::mqtt::server::broker::SubscribtionTree::TopicLevel::unsubscribe ( const std::string & clientId,
std::string topic )

Definition at line 179 of file SubscribtionTree.cpp.

179 {
180 if (topic.empty()) {
181 if (clientIds.erase(clientId) != 0) {
182 LOG(INFO) << "MQTT Broker: Unsubscribe";
183 LOG(INFO) << "MQTT Broker: ClientId: " << clientId;
184 LOG(INFO) << "MQTT Broker: Topic: " << topicLevel;
185 }
186 } else {
187 const std::string topicLevel = topic.substr(0, topic.find('/'));
188
189 auto&& it = topicLevels.find(topicLevel);
190 if (it != topicLevels.end()) {
191 topic.erase(0, topicLevel.size() + 1);
192
193 if (it->second.unsubscribe(clientId, topic)) {
194 LOG(DEBUG) << "MQTT Broker: Erase Topic: " << it->first;
195
196 topicLevels.erase(it);
197 }
198 }
199 }
200
201 return clientIds.empty() && topicLevels.empty();
202 }

Member Data Documentation

◆ broker

iot::mqtt::server::broker::Broker* iot::mqtt::server::broker::SubscribtionTree::TopicLevel::broker
private

Definition at line 81 of file SubscribtionTree.h.

◆ clientIds

std::map<std::string, uint8_t> iot::mqtt::server::broker::SubscribtionTree::TopicLevel::clientIds
private

Definition at line 77 of file SubscribtionTree.h.

◆ topicLevel

std::string iot::mqtt::server::broker::SubscribtionTree::TopicLevel::topicLevel
private

Definition at line 82 of file SubscribtionTree.h.

◆ topicLevels

std::map<std::string, TopicLevel> iot::mqtt::server::broker::SubscribtionTree::TopicLevel::topicLevels
private

Definition at line 79 of file SubscribtionTree.h.


The documentation for this class was generated from the following files: