SNode.C
Loading...
Searching...
No Matches
Mqtt.cpp
Go to the documentation of this file.
1/*
2 * SNode.C - a slim toolkit for network communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "Mqtt.h"
21
22#include "MqttContext.h"
23#include "core/socket/stream/SocketConnection.h"
24#include "iot/mqtt/ControlPacketDeserializer.h"
25#include "iot/mqtt/Session.h"
26#include "iot/mqtt/packets/Puback.h"
27#include "iot/mqtt/packets/Pubcomp.h"
28#include "iot/mqtt/packets/Publish.h"
29#include "iot/mqtt/packets/Pubrec.h"
30#include "iot/mqtt/packets/Pubrel.h"
31
32#ifndef DOXYGEN_SHOULD_SKIP_THIS
33
34#include "log/Logger.h"
35#include "utils/hexdump.h"
36
37#include <functional>
38#include <iomanip>
39#include <ios>
40#include <map>
41#include <set>
42
43#endif // DOXYGEN_SHOULD_SKIP_THIS
44
45namespace iot::mqtt {
46
47 Mqtt::Mqtt(const std::string& connectionName)
48 : connectionName(connectionName) {
49 }
50
51 Mqtt::Mqtt(const std::string& connectionName, const std::string& clientId)
52 : connectionName(connectionName)
53 , clientId(clientId) {
54 }
55
56 Mqtt::~Mqtt() {
57 if (controlPacketDeserializer != nullptr) {
58 delete controlPacketDeserializer;
59 controlPacketDeserializer = nullptr;
60 }
61
62 keepAliveTimer.cancel();
63 }
64
65 void Mqtt::setMqttContext(MqttContext* mqttContext) {
66 this->mqttContext = mqttContext;
67 }
68
69 const MqttContext* Mqtt::getMqttContext() const {
70 return mqttContext;
71 }
72
73 void Mqtt::onConnected() {
74 LOG(INFO) << "MQTT: Connected";
75 }
76
77 std::size_t Mqtt::onReceivedFromPeer() {
78 std::size_t consumed = 0;
79
80 switch (state) {
81 case 0:
82 consumed = fixedHeader.deserialize(mqttContext);
84 if (!fixedHeader.isComplete()) {
85 break;
86 }
87 if (fixedHeader.isError()) {
88 mqttContext->close();
89 break;
90 }
91 printFixedHeader(fixedHeader);
92
93 controlPacketDeserializer = createControlPacketDeserializer(fixedHeader);
94
95 fixedHeader.reset();
96
97 if (controlPacketDeserializer == nullptr) {
98 LOG(DEBUG) << connectionName << " MQTT: Received packet-type is unavailable ... closing connection";
99
100 mqttContext->end(true);
101 break;
102 }
103 if (controlPacketDeserializer->isError()) {
104 LOG(DEBUG) << connectionName << " MQTT: Fixed header has error ... closing connection";
105
106 delete controlPacketDeserializer;
107 controlPacketDeserializer = nullptr;
108
109 mqttContext->end(true);
110 break;
111 }
112
113 state++;
114
115 [[fallthrough]];
116 case 1:
117 consumed += controlPacketDeserializer->deserialize(mqttContext);
118
119 if (controlPacketDeserializer->isError()) {
120 LOG(DEBUG) << connectionName << " MQTT: Control packet has error ... closing connection";
121 mqttContext->end(true);
122
123 delete controlPacketDeserializer;
124 controlPacketDeserializer = nullptr;
125
126 state = 0;
127 } else if (controlPacketDeserializer->isComplete()) {
128 deliverPacket(controlPacketDeserializer);
129
130 delete controlPacketDeserializer;
131 controlPacketDeserializer = nullptr;
132
133 state = 0;
134
135 keepAliveTimer.restart();
136 }
137
138 break;
139 }
140
141 return consumed;
142 }
143
144 void Mqtt::onDisconnected() {
145 LOG(INFO) << connectionName << " MQTT: Disconnected";
146 }
147
148 const std::string& Mqtt::getConnectionName() const {
149 return connectionName;
150 }
151
152 void Mqtt::initSession(Session* session, utils::Timeval keepAlive) {
153 this->session = session;
154
155 for (const auto& [packetIdentifier, publish] : session->publishMap) {
156 LOG(INFO) << connectionName << " MQTT: PUBLISH Resend";
157
158 send(publish);
159 }
160
161 for (const uint16_t packetIdentifier : session->pubrelPacketIdentifierSet) {
162 LOG(INFO) << connectionName << " MQTT: PUBREL Resend";
163
164 send(iot::mqtt::packets::Pubrel(packetIdentifier));
165 }
166
167 if (keepAlive > 0) {
168 keepAlive *= 1.5;
169
170 LOG(INFO) << connectionName << " MQTT: Keep alive initialized with: " << keepAlive;
171
172 keepAliveTimer = core::timer::Timer::singleshotTimer(
173 [this, keepAlive]() {
174 LOG(ERROR) << connectionName << " MQTT: Keep-alive timer expired. Interval was: " << keepAlive;
175 mqttContext->close();
176 },
177 keepAlive);
178 }
179
180 mqttContext->getSocketConnection()->setTimeout(0);
181 }
182
183 void Mqtt::send(const ControlPacket& controlPacket) const {
184 LOG(INFO) << connectionName << " MQTT: " << controlPacket.getName() << " send: " << clientId;
185
186 send(controlPacket.serialize());
187 }
188
189 void Mqtt::send(const std::vector<char>& data) const {
190 LOG(TRACE) << connectionName << " MQTT: Send data (full message):\n" << toHexString(data);
191
192 mqttContext->send(data.data(), data.size());
193 }
194
195 void Mqtt::sendPublish(const std::string& topic, const std::string& message, uint8_t qoS,
196 bool retain) { // Server & Client
197
198 uint16_t packageIdentifier = qoS != 0 ? getPacketIdentifier() : 0;
199
200 send(iot::mqtt::packets::Publish(packageIdentifier, topic, message, qoS, false, retain));
201
202 LOG(INFO) << connectionName << " MQTT: Topic: " << topic;
203 LOG(INFO) << connectionName << " MQTT: Message:\n" << toHexString(message);
204 LOG(DEBUG) << connectionName << " MQTT: QoS: " << static_cast<uint16_t>(qoS);
205 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: " << _packetIdentifier;
206 LOG(DEBUG) << connectionName << " MQTT: DUP: " << false;
207 LOG(DEBUG) << connectionName << " MQTT: Retain: " << retain;
208
209 if (qoS == 2) {
210 session->publishMap.emplace(packageIdentifier,
211 iot::mqtt::packets::Publish(packageIdentifier, topic, message, qoS, true, retain));
212 }
213 }
214
215 void Mqtt::sendPuback(uint16_t packetIdentifier) const { // Server & Client
216 send(iot::mqtt::packets::Puback(packetIdentifier));
217 }
218
219 void Mqtt::sendPubrec(uint16_t packetIdentifier) const { // Server & Client
220 send(iot::mqtt::packets::Pubrec(packetIdentifier));
221 }
222
223 void Mqtt::sendPubrel(uint16_t packetIdentifier) const { // Server & Client
224 send(iot::mqtt::packets::Pubrel(packetIdentifier));
225 }
226
227 void Mqtt::sendPubcomp(uint16_t packetIdentifier) const { // Server & Client
228 send(iot::mqtt::packets::Pubcomp(packetIdentifier));
229 }
230
231 void Mqtt::onPublish([[maybe_unused]] const packets::Publish& publish) {
232 }
233
234 void Mqtt::onPuback([[maybe_unused]] const iot::mqtt::packets::Puback& puback) {
235 }
236
237 void Mqtt::onPubrec([[maybe_unused]] const iot::mqtt::packets::Pubrec& pubrec) {
238 }
239
240 void Mqtt::onPubrel([[maybe_unused]] const iot::mqtt::packets::Pubrel& pubrel) {
241 }
242
243 void Mqtt::onPubcomp([[maybe_unused]] const iot::mqtt::packets::Pubcomp& pubcomp) {
244 }
245
246 bool Mqtt::_onPublish(const packets::Publish& publish) {
247 bool deliver = true;
248
249 LOG(INFO) << connectionName << " MQTT: Topic: " << publish.getTopic();
250 LOG(INFO) << connectionName << " MQTT: Message:\n" << toHexString(publish.getMessage());
251 LOG(DEBUG) << connectionName << " MQTT: QoS: " << static_cast<uint16_t>(publish.getQoS());
252 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: " << publish.getPacketIdentifier();
253 LOG(DEBUG) << connectionName << " MQTT: DUP: " << publish.getDup();
254 LOG(DEBUG) << connectionName << " MQTT: Retain: " << publish.getRetain();
255
256 if (publish.getQoS() > 2) {
257 LOG(ERROR) << connectionName << " MQTT: Received invalid QoS: " << publish.getQoS();
258 mqttContext->end(true);
259 deliver = false;
260 } else if (publish.getPacketIdentifier() == 0 && publish.getQoS() > 0) {
261 LOG(ERROR) << connectionName << " MQTT: Received QoS > 0 but no PackageIdentifier present";
262 mqttContext->end(true);
263 deliver = false;
264 } else {
265 switch (publish.getQoS()) {
266 case 1:
267 sendPuback(publish.getPacketIdentifier());
268
269 break;
270 case 2:
271 sendPubrec(publish.getPacketIdentifier());
272
273 if (session->publishPacketIdentifierSet.contains(publish.getPacketIdentifier())) {
274 deliver = false;
275 } else {
276 session->publishPacketIdentifierSet.insert(publish.getPacketIdentifier());
277 }
278
279 break;
280 }
281 }
282
283 return deliver;
284 }
285
286 void Mqtt::_onPuback(const iot::mqtt::packets::Puback& puback) {
287 if (puback.getPacketIdentifier() == 0) {
288 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
289 mqttContext->end(true);
290 } else {
291 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
292 << puback.getPacketIdentifier();
293 }
294
295 onPuback(puback);
296 }
297
298 void Mqtt::_onPubrec(const iot::mqtt::packets::Pubrec& pubrec) {
299 if (pubrec.getPacketIdentifier() == 0) {
300 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
301 mqttContext->end(true);
302 } else {
303 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
304 << pubrec.getPacketIdentifier();
305
306 session->publishMap.erase(pubrec.getPacketIdentifier());
307 session->pubrelPacketIdentifierSet.insert(pubrec.getPacketIdentifier());
308
309 sendPubrel(pubrec.getPacketIdentifier());
310 }
311
312 onPubrec(pubrec);
313 }
314
315 void Mqtt::_onPubrel(const iot::mqtt::packets::Pubrel& pubrel) {
316 if (pubrel.getPacketIdentifier() == 0) {
317 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
318 mqttContext->end(true);
319 } else {
320 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
321 << pubrel.getPacketIdentifier();
322
323 session->publishPacketIdentifierSet.erase(pubrel.getPacketIdentifier());
324
325 sendPubcomp(pubrel.getPacketIdentifier());
326 }
327
328 onPubrel(pubrel);
329 }
330
331 void Mqtt::_onPubcomp(const iot::mqtt::packets::Pubcomp& pubcomp) {
332 if (pubcomp.getPacketIdentifier() == 0) {
333 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
334 mqttContext->end(true);
335 } else {
336 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
337 << pubcomp.getPacketIdentifier();
338
339 session->publishMap.erase(pubcomp.getPacketIdentifier());
340 session->pubrelPacketIdentifierSet.erase(pubcomp.getPacketIdentifier());
341 }
342
343 onPubcomp(pubcomp);
344 }
345
346 void Mqtt::printVP(const iot::mqtt::ControlPacket& packet) const {
347 LOG(INFO) << connectionName << " MQTT: " << packet.getName() << " received: " << clientId;
348
349 const std::string hexString = toHexString(packet.serializeVP());
350 if (!hexString.empty()) {
351 LOG(TRACE) << connectionName << " MQTT: Received data (variable header and payload):\n" << hexString;
352 }
353 }
354
355 void Mqtt::printFixedHeader(const FixedHeader& fixedHeader) const {
356 LOG(INFO) << connectionName << " MQTT: ======================================================";
357
358 LOG(TRACE) << connectionName << " MQTT: Received data (fixed header):\n" << toHexString(fixedHeader.serialize());
359
360 LOG(DEBUG) << connectionName << " MQTT: Fixed Header: PacketType: 0x" << std::hex << std::setfill('0') << std::setw(2)
361 << static_cast<uint16_t>(fixedHeader.getType()) << " (" << iot::mqtt::mqttPackageName[fixedHeader.getType()] << ")";
362 LOG(DEBUG) << connectionName << " MQTT: PacketFlags: 0x" << std::hex << std::setfill('0') << std::setw(2)
363 << static_cast<uint16_t>(fixedHeader.getFlags()) << std::dec;
364 LOG(DEBUG) << connectionName << " MQTT: RemainingLength: " << fixedHeader.getRemainingLength();
365 }
366
367 std::string Mqtt::toHexString(const std::vector<char>& data) {
368 const std::string hexDump = utils::hexDump(data, 32);
369 return !hexDump.empty() ? std::string(32, ' ').append(hexDump) : "";
370 }
371
372 std::string Mqtt::toHexString(const std::string& data) {
373 return toHexString(std::vector<char>(data.begin(), data.end()));
374 }
375
376 uint16_t Mqtt::getPacketIdentifier() {
377 ++_packetIdentifier;
378
379 if (_packetIdentifier == 0) {
380 ++_packetIdentifier;
381 }
382
383 return _packetIdentifier;
384 }
385
386} // namespace iot::mqtt