MQTTSuite
Loading...
Searching...
No Matches
Mqtt.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025, 2026
5 * Tobias Pfeil
6 * 2025, 2026
7 *
8 * This program is free software: you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License as published by the Free
10 * Software Foundation, either version 3 of the License, or (at your option)
11 * any later version.
12 *
13 * This program is distributed in the hope that it will be useful, but WITHOUT
14 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
15 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
16 * more details.
17 *
18 * You should have received a copy of the GNU General Public License along
19 * with this program. If not, see <https://www.gnu.org/licenses/>.
20 */
21
22/*
23 * MIT License
24 *
25 * Permission is hereby granted, free of charge, to any person obtaining a copy
26 * of this software and associated documentation files (the "Software"), to deal
27 * in the Software without restriction, including without limitation the rights
28 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
29 * copies of the Software, and to permit persons to whom the Software is
30 * furnished to do so, subject to the following conditions:
31 *
32 * The above copyright notice and this permission notice shall be included in
33 * all copies or substantial portions of the Software.
34 *
35 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
36 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
37 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
38 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
39 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
40 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
41 * THE SOFTWARE.
42 */
43
44#include "Mqtt.h"
45
46#include "lib/MappingAdminRouter.h"
47#include "lib/MqttMapper.h"
48
49#include <iot/mqtt/Topic.h>
50#include <iot/mqtt/packets/Connack.h>
51#include <iot/mqtt/packets/Publish.h>
52
53#ifndef DOXYGEN_SHOULD_SKIP_THIS
54
55#include <algorithm>
56#include <functional>
57#include <nlohmann/json_fwd.hpp>
58
59#endif
60
61namespace mqtt::mqttintegrator::lib {
62
63 std::set<Mqtt*> Mqtt::mqttInstances;
64
66 utils::Timeval when = 0;
67 std::size_t seq = 0;
68 iot::mqtt::packets::Publish publish;
69 utils::Timeval delay;
70 };
71
72 Mqtt::Mqtt(const std::string& connectionName,
73 std::shared_ptr<mqtt::lib::MqttMapper> mqttMapper,
74 const std::string& sessionStoreFileName)
75 : iot::mqtt::client::Mqtt(connectionName, //
76 mqttMapper->getConnection()["client_id"],
77 mqttMapper->getConnection()["keep_alive"],
78 sessionStoreFileName)
79 , mqttMapper(mqttMapper)
81 , delayedQueue(this) {
82 mqttInstances.insert(this);
83 }
84
85 Mqtt::~Mqtt() {
86 mqttInstances.erase(this);
87 }
88
89 mqtt::lib::admin::ReloadResult Mqtt::updateSubscriptions(bool mustReconnect) {
90 mqtt::lib::admin::ReloadResult reloadResult;
91
92 reloadResult.instances = mqttInstances.size();
93 if (mustReconnect) {
94 reloadResult.mode = "reconnect";
95 } else {
96 reloadResult.mode = "hot";
97 }
98
99 for (Mqtt* mqtt : mqttInstances) {
100 if (mustReconnect) {
101 mqtt->sendDisconnect();
102 } else {
103 auto [subscribeCount, unsubscribeCount] = mqtt->resubscribe();
104
105 reloadResult.subscribed += subscribeCount;
106 reloadResult.unsubscribed += unsubscribeCount;
107 }
108 }
109
110 return reloadResult;
111 }
112
114 const nlohmann::json& connection = mqttMapper->getConnection();
115
116 sendConnect(connection["clean_session"],
117 connection["will_topic"],
118 connection["will_message"],
119 connection["will_qos"],
120 connection["will_retain"],
121 connection["username"],
122 connection["password"]);
123 }
124
125 bool Mqtt::onSignal(int signum) {
126 sendDisconnect();
127 return Super::onSignal(signum);
128 }
129
130 void Mqtt::onConnack(const iot::mqtt::packets::Connack& connack) {
131 if (connack.getReturnCode() == 0 && !connack.getSessionPresent()) {
132 sendSubscribe(currentSubscriptions);
133 }
134 }
135
136 void Mqtt::onPublish(const iot::mqtt::packets::Publish& publish) {
137 auto [immediatePublishes, scheduledPublishes] = mqttMapper->getMappings(publish);
138
139 for (const mqtt::lib::MqttMapper::ScheduledPublish& delayedPublish : scheduledPublishes) {
140 delayedQueue.delayPublish(delayedPublish.delay, delayedPublish.publish);
141 }
142
143 for (const iot::mqtt::packets::Publish& mappedPublish : immediatePublishes) {
144 sendPublish(mappedPublish.getTopic(), mappedPublish.getMessage(), mappedPublish.getQoS(), mappedPublish.getRetain());
145
146 onPublish(mappedPublish);
147 }
148 }
149
150 std::pair<std::size_t, std::size_t> Mqtt::resubscribe() {
151 std::list<iot::mqtt::Topic> newSubscriptions = mqttMapper->extractSubscriptions();
152
153 std::list<std::string> topicsToUnsubscribe;
154 for (const auto& currentTopic : currentSubscriptions) {
155 const bool existsInNew = std::any_of(newSubscriptions.begin(), newSubscriptions.end(), [&](const auto& newTopic) {
156 return currentTopic.getName() == newTopic.getName() && currentTopic.getQoS() == newTopic.getQoS();
157 });
158
159 if (!existsInNew) {
160 topicsToUnsubscribe.push_back(currentTopic.getName());
161 }
162 }
163
164 if (!topicsToUnsubscribe.empty()) {
165 sendUnsubscribe(topicsToUnsubscribe);
166 }
167
168 std::list<iot::mqtt::Topic> topicsToSubscribe;
169 for (const auto& newTopic : newSubscriptions) {
170 const bool existsInOld = std::any_of(currentSubscriptions.begin(), currentSubscriptions.end(), [&](const auto& currentTopic) {
171 return currentTopic.getName() == newTopic.getName() && currentTopic.getQoS() == newTopic.getQoS();
172 });
173
174 if (!existsInOld) {
175 topicsToSubscribe.push_back(newTopic);
176 }
177 }
178
179 if (!topicsToSubscribe.empty()) {
180 sendSubscribe(topicsToSubscribe);
181 }
182
183 currentSubscriptions = newSubscriptions;
184
185 return {topicsToSubscribe.size(), topicsToUnsubscribe.size()};
186 }
187
189 if (a.when != b.when) {
190 return a.when > b.when;
191 }
192
193 return a.seq > b.seq;
194 }
195
197 : mqtt(mqtt) {
198 }
199
201 delayTimer.cancel();
202 }
203
205 const auto now = utils::Timeval::currentTime();
206
207 while (!empty() && top().when <= now) {
208 const iot::mqtt::packets::Publish duePublish = top().publish;
209 pop();
210
211 mqtt->sendPublish(duePublish.getTopic(), duePublish.getMessage(), duePublish.getQoS(), duePublish.getRetain());
212
213 mqtt->onPublish(duePublish);
214 }
215 }
216
218 delayTimer.cancel();
219
220 auto delay = top().when - utils::Timeval::currentTime();
221 if (delay < utils::Timeval{}) {
222 delay = utils::Timeval{};
223 }
224
225 delayTimer = core::timer::Timer::singleshotTimer(
226 [this]() {
228
229 if (!empty()) {
231 }
232 },
233 delay);
234 }
235
236 void Mqtt::DelayedQueue::delayPublish(const utils::Timeval& delay, const iot::mqtt::packets::Publish& publish) {
237 minHeap.emplace(utils::Timeval::currentTime() + delay, nextSeq++, publish, delay);
239 }
240
241 bool Mqtt::DelayedQueue::empty() const {
242 return minHeap.empty();
243 }
244
246 return minHeap.top();
247 }
248
250 minHeap.pop();
251 }
252
253} // namespace mqtt::mqttintegrator::lib
std::list< iot::mqtt::Topic > extractSubscriptions() const
MappedPublishes getMappings(const iot::mqtt::packets::Publish &publish)
const nlohmann::json & getConnection() const
std::priority_queue< ScheduledPublish, std::vector< ScheduledPublish >, EarlierFirst > minHeap
Definition Mqtt.h:122
ScheduledPublish const & top() const
Definition Mqtt.cpp:245
void delayPublish(const utils::Timeval &delay, const iot::mqtt::packets::Publish &publish)
Definition Mqtt.cpp:236
static std::set< Mqtt * > mqttInstances
Definition Mqtt.h:129
void onPublish(const iot::mqtt::packets::Publish &publish) final
Definition Mqtt.cpp:136
void onConnack(const iot::mqtt::packets::Connack &connack) final
Definition Mqtt.cpp:130
std::pair< std::size_t, std::size_t > resubscribe()
Definition Mqtt.cpp:150
static mqtt::lib::admin::ReloadResult updateSubscriptions(bool mustReconnect)
Definition Mqtt.cpp:89
std::shared_ptr< mqtt::lib::MqttMapper > mqttMapper
Definition Mqtt.h:100
std::list< iot::mqtt::Topic > currentSubscriptions
Definition Mqtt.h:101
bool onSignal(int signum) final
Definition Mqtt.cpp:125
iot::mqtt::client::Mqtt Super
Definition Mqtt.h:88
iot::mqtt::packets::Publish publish
Definition MqttMapper.h:80
bool operator()(const ScheduledPublish &a, const ScheduledPublish &b) const
Definition Mqtt.cpp:188
iot::mqtt::packets::Publish publish
Definition Mqtt.cpp:68