90 LOG(DEBUG) <<
"MQTT Broker: Retain:";
91 LOG(DEBUG) <<
"MQTT Broker: Topic: " << message.getTopic();
92 LOG(DEBUG) <<
"MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
93 LOG(DEBUG) <<
"MQTT Broker: QoS: " <<
static_cast<uint16_t>(message
.getQoS());
95 this->message = message;
97 const std::string topicLevel = topic.substr(0, topic.find(
'/'));
99 topic.erase(0, topicLevel.size() + 1);
101 subTopicLevels.insert({topicLevel, RetainTree::TopicLevel(broker)}).first->second.retain(message, topic);
107 LOG(DEBUG) <<
"MQTT Broker: Release retained:";
108 LOG(DEBUG) <<
"MQTT Broker: Topic: " << message.getTopic();
112 const std::string topicLevel = topic.substr(0, topic.find(
'/'));
114 auto&& it = subTopicLevels.find(topicLevel);
115 if (it != subTopicLevels.end()) {
116 topic.erase(0, topicLevel.size() + 1);
118 if (it->second.release(topic)) {
119 LOG(DEBUG) <<
" Erase: " << topicLevel;
121 subTopicLevels.erase(it);
126 return subTopicLevels.empty() && message.getMessage().empty();
131 if (!message.getTopic().empty()) {
132 LOG(INFO) <<
"MQTT Broker: Retained Topic found:";
133 LOG(INFO) <<
"MQTT Broker: Topic: " << message.getTopic();
134 LOG(INFO) <<
"MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
135 LOG(DEBUG) <<
"MQTT Broker: QoS: " <<
static_cast<uint16_t>(message.getQoS());
136 LOG(DEBUG) <<
"MQTT Broker: Client:";
137 LOG(DEBUG) <<
"MQTT Broker: QoS: " <<
static_cast<uint16_t>(qoS);
139 LOG(INFO) <<
"MQTT Broker: Distributing message ...";
140 broker->sendPublish(clientId, message, std::min(message.getQoS(), qoS),
true);
141 LOG(INFO) <<
"MQTT Broker: ... distributing message completed";
144 const std::string topicLevel = topic.substr(0, topic.find(
'/'));
146 topic.erase(0, topicLevel.size() + 1);
148 auto foundNode = subTopicLevels.find(topicLevel);
149 if (foundNode != subTopicLevels.end()) {
150 foundNode->second.appear(clientId, topic, qoS);
151 }
else if (topicLevel ==
"+") {
152 for (
auto& [notUsed, topicTree] : subTopicLevels) {
153 topicTree.appear(clientId, topic, qoS);
155 }
else if (topicLevel ==
"#") {
156 appear(clientId, qoS);
162 if (!message.getTopic().empty()) {
163 LOG(INFO) <<
"MQTT Broker: Retained Topic found:";
164 LOG(INFO) <<
"MQTT Broker: Topic: " << message.getTopic();
165 LOG(INFO) <<
"MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
166 LOG(DEBUG) <<
"MQTT Broker: QoS: " <<
static_cast<uint16_t>(message.getQoS());
167 LOG(DEBUG) <<
"MQTT Broker: Client:";
168 LOG(DEBUG) <<
"MQTT Broker: QoS: " <<
static_cast<uint16_t>(clientQoS);
170 LOG(INFO) <<
"MQTT Broker: Distributing message ...";
171 broker->sendPublish(clientId, message, std::min(message.getQoS(), clientQoS),
true);
172 LOG(INFO) <<
"MQTT Broker: ... distributing message completed";
175 for (
auto& [topicLevel, topicTree] : subTopicLevels) {
176 topicTree.appear(clientId, clientQoS);