SNode.C
Loading...
Searching...
No Matches
Mqtt.cpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "Mqtt.h"
43
44#include "MqttContext.h"
45#include "core/socket/stream/SocketConnection.h"
46#include "iot/mqtt/ControlPacketDeserializer.h"
47#include "iot/mqtt/Session.h"
48#include "iot/mqtt/packets/Puback.h"
49#include "iot/mqtt/packets/Pubcomp.h"
50#include "iot/mqtt/packets/Publish.h"
51#include "iot/mqtt/packets/Pubrec.h"
52#include "iot/mqtt/packets/Pubrel.h"
53
54#ifndef DOXYGEN_SHOULD_SKIP_THIS
55
56#include "log/Logger.h"
57#include "utils/hexdump.h"
58
59#include <functional>
60#include <iomanip>
61#include <ios>
62#include <map>
63#include <set>
64
65#endif // DOXYGEN_SHOULD_SKIP_THIS
66
67namespace iot::mqtt {
68
69 Mqtt::Mqtt(const std::string& connectionName)
70 : connectionName(connectionName) {
71 }
72
73 Mqtt::Mqtt(const std::string& connectionName, const std::string& clientId)
74 : connectionName(connectionName)
75 , clientId(clientId) {
76 }
77
78 Mqtt::~Mqtt() {
79 if (controlPacketDeserializer != nullptr) {
82 }
83
85 }
86
87 void Mqtt::setMqttContext(MqttContext* mqttContext) {
88 this->mqttContext = mqttContext;
89 }
90
92 return mqttContext;
93 }
94
95 void Mqtt::onConnected() {
96 LOG(INFO) << "MQTT: Connected";
97 }
98
99 std::size_t Mqtt::onReceivedFromPeer() {
100 std::size_t consumed = 0;
101
102 switch (state) {
103 case 0:
107 break;
108 }
111 break;
112 }
114
116
118
119 if (controlPacketDeserializer == nullptr) {
120 LOG(DEBUG) << connectionName << " MQTT: Received packet-type is unavailable ... closing connection";
121
123 break;
124 }
126 LOG(DEBUG) << connectionName << " MQTT: Fixed header has error ... closing connection";
127
130
132 break;
133 }
134
135 state++;
136
137 [[fallthrough]];
138 case 1:
140
142 LOG(DEBUG) << connectionName << " MQTT: Control packet has error ... closing connection";
144
147
148 state = 0;
151
154
155 state = 0;
156
158 }
159
160 break;
161 }
162
163 return consumed;
164 }
165
167 LOG(INFO) << connectionName << " MQTT: Disconnected";
168 }
169
170 const std::string& Mqtt::getConnectionName() const {
171 return connectionName;
172 }
173
174 void Mqtt::initSession(Session* session, utils::Timeval keepAlive) {
175 this->session = session;
176
177 for (const auto& [packetIdentifier, publish] : session->outgoingPublishMap) {
178 LOG(INFO) << connectionName << " MQTT: PUBLISH Resend";
179
180 send(publish);
181 }
182
183 for (const uint16_t packetIdentifier : session->pubrelPacketIdentifierSet) {
184 LOG(INFO) << connectionName << " MQTT: PUBREL Resend";
185
186 sendPubrel(packetIdentifier);
187 }
188
189 if (keepAlive > 0) {
190 keepAlive *= 1.5;
191
192 LOG(INFO) << connectionName << " MQTT: Keep alive initialized with: " << keepAlive;
193
195 [this, keepAlive]() {
196 LOG(ERROR) << connectionName << " MQTT: Keep-alive timer expired. Interval was: " << keepAlive;
198 },
199 keepAlive);
200 }
201
203 }
204
205 void Mqtt::send(const ControlPacket& controlPacket) const {
206 LOG(INFO) << connectionName << " MQTT: " << controlPacket.getName() << " send: " << clientId;
207
208 send(controlPacket.serialize());
209 }
210
211 void Mqtt::send(const std::vector<char>& data) const {
212 LOG(TRACE) << connectionName << " MQTT: Send data (full message):\n" << toHexString(data);
213
214 mqttContext->send(data.data(), data.size());
215 }
216
217 void Mqtt::sendPublish(const std::string& topic, const std::string& message, uint8_t qoS, bool retain) const {
218 const uint16_t packetIdentifier = qoS != 0 ? getPacketIdentifier() : 0;
219
220 send(iot::mqtt::packets::Publish(packetIdentifier, topic, message, qoS, false, retain));
221
222 LOG(INFO) << connectionName << " MQTT: Topic: " << topic;
223 LOG(INFO) << connectionName << " MQTT: Message:\n" << toHexString(message);
224 LOG(DEBUG) << connectionName << " MQTT: QoS: " << static_cast<uint16_t>(qoS);
225 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: " << packetIdentifier;
226 LOG(DEBUG) << connectionName << " MQTT: DUP: " << false;
227 LOG(DEBUG) << connectionName << " MQTT: Retain: " << retain;
228
229 if (qoS >= 1) {
230 session->outgoingPublishMap.insert_or_assign(packetIdentifier,
231 iot::mqtt::packets::Publish(packetIdentifier, topic, message, qoS, true, retain));
232 }
233 }
234
235 void Mqtt::sendPuback(uint16_t packetIdentifier) const { // Server & Client
236 send(iot::mqtt::packets::Puback(packetIdentifier));
237 }
238
239 void Mqtt::sendPubrec(uint16_t packetIdentifier) const { // Server & Client
240 send(iot::mqtt::packets::Pubrec(packetIdentifier));
241 }
242
243 void Mqtt::sendPubrel(uint16_t packetIdentifier) const { // Server & Client
244 send(iot::mqtt::packets::Pubrel(packetIdentifier));
245 }
246
247 void Mqtt::sendPubcomp(uint16_t packetIdentifier) const { // Server & Client
248 send(iot::mqtt::packets::Pubcomp(packetIdentifier));
249 }
250
251 void Mqtt::onPublish([[maybe_unused]] const packets::Publish& publish) {
252 }
253
254 void Mqtt::onPuback([[maybe_unused]] const iot::mqtt::packets::Puback& puback) {
255 }
256
257 void Mqtt::onPubrec([[maybe_unused]] const iot::mqtt::packets::Pubrec& pubrec) {
258 }
259
260 void Mqtt::onPubrel([[maybe_unused]] const iot::mqtt::packets::Pubrel& pubrel) {
261 }
262
263 void Mqtt::onPubcomp([[maybe_unused]] const iot::mqtt::packets::Pubcomp& pubcomp) {
264 }
265
266 bool Mqtt::_onPublish(const iot::mqtt::packets::Publish& publish) {
267 bool deliver = true;
268
269 LOG(INFO) << connectionName << " MQTT: Topic: " << publish.getTopic();
270 LOG(INFO) << connectionName << " MQTT: Message:\n" << toHexString(publish.getMessage());
271 LOG(DEBUG) << connectionName << " MQTT: QoS: " << static_cast<uint16_t>(publish.getQoS());
272 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: " << publish.getPacketIdentifier();
273 LOG(DEBUG) << connectionName << " MQTT: DUP: " << publish.getDup();
274 LOG(DEBUG) << connectionName << " MQTT: Retain: " << publish.getRetain();
275
276 if (publish.getQoS() > 2) {
277 LOG(ERROR) << connectionName << " MQTT: Received invalid QoS: " << publish.getQoS();
279 deliver = false;
280 } else if (publish.getPacketIdentifier() == 0 && publish.getQoS() > 0) {
281 LOG(ERROR) << connectionName << " MQTT: Received QoS > 0 but no PackageIdentifier present";
283 deliver = false;
284 } else if (publish.getQoS() == 0 && publish.getDup()) {
285 LOG(ERROR) << connectionName << " MQTT: Received QoS == 0 but dup is set";
287 deliver = false;
288 } else {
289 switch (publish.getQoS()) {
290 case 1:
292
293 break;
294 case 2:
296
297 deliver = false;
298
299 const uint16_t pid = publish.getPacketIdentifier();
300
301 if (session->pubcompPacketIdentifierSet.contains(pid)) {
302 if (!publish.getDup()) {
304 } else {
305 LOG(WARNING) << connectionName << " MQTT: Duplicate QoS2 PUBLISH after PUBCOMP for PacketIdentifier: " << pid;
306 break;
307 }
308 }
309
310 if (session->incomingPublishMap.contains(pid)) {
311 LOG(INFO) << connectionName << " MQTT: Duplicate QoS2 PUBLISH suppressed for PacketIdentifier: " << pid;
312 } else {
313 session->incomingPublishMap.emplace(pid, publish);
314 }
315
316 break;
317 }
318 }
319
320 return deliver;
321 }
322
323 void Mqtt::_onPuback(const iot::mqtt::packets::Puback& puback) {
324 if (puback.getPacketIdentifier() == 0) {
325 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
327 } else {
328 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
329 << puback.getPacketIdentifier() << std::dec;
330
332 }
333
334 onPuback(puback);
335 }
336
337 void Mqtt::_onPubrec(const iot::mqtt::packets::Pubrec& pubrec) {
338 if (pubrec.getPacketIdentifier() == 0) {
339 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
341 } else {
342 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
343 << pubrec.getPacketIdentifier() << std::dec;
344
347
349 }
350
351 onPubrec(pubrec);
352 }
353
354 void Mqtt::_onPubrel(const iot::mqtt::packets::Pubrel& pubrel) {
355 if (pubrel.getPacketIdentifier() == 0) {
356 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
358 } else {
359 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
360 << pubrel.getPacketIdentifier() << std::dec;
361
362 const uint16_t pid = pubrel.getPacketIdentifier();
363
364 if (session->incomingPublishMap.contains(pid)) {
365 LOG(INFO) << connectionName << " MQTT: QoS2 PUBREL received. Deliver publish: " << pid;
366
368
371 } else if (session->pubcompPacketIdentifierSet.contains(pid)) {
372 LOG(INFO) << connectionName << " MQTT: Duplicate QoS2 PUBREL for completed PacketIdentifier: " << pid;
373 } else {
374 LOG(WARNING) << connectionName << " MQTT: QoS2 PUBREL received for unknown PacketIdentifier: " << pid;
375
377 }
378
379 sendPubcomp(pid);
380 }
381
382 onPubrel(pubrel);
383 }
384
385 void Mqtt::_onPubcomp(const iot::mqtt::packets::Pubcomp& pubcomp) {
386 if (pubcomp.getPacketIdentifier() == 0) {
387 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
389 } else {
390 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
391 << pubcomp.getPacketIdentifier() << std::dec;
392
395 }
396
397 onPubcomp(pubcomp);
398 }
399
400 void Mqtt::printVP(const iot::mqtt::ControlPacket& packet) const {
401 LOG(INFO) << connectionName << " MQTT: " << packet.getName() << " received: " << clientId;
402
403 const std::string hexString = toHexString(packet.serializeVP());
404 if (!hexString.empty()) {
405 LOG(TRACE) << connectionName << " MQTT: Received data (variable header and payload):\n" << hexString;
406 }
407 }
408
409 void Mqtt::printFixedHeader(const FixedHeader& fixedHeader) const {
410 LOG(INFO) << connectionName << " MQTT: ====================================";
411
412 LOG(TRACE) << connectionName << " MQTT: Received data (fixed header):\n" << toHexString(fixedHeader.serialize());
413
414 LOG(DEBUG) << connectionName << " MQTT: Fixed Header: PacketType: 0x" << std::hex << std::setfill('0') << std::setw(2)
415 << static_cast<uint16_t>(fixedHeader.getType()) << " (" << iot::mqtt::mqttPackageName[fixedHeader.getType()] << ")"
416 << std::dec;
417 LOG(DEBUG) << connectionName << " MQTT: PacketFlags: 0x" << std::hex << std::setfill('0') << std::setw(2)
418 << static_cast<uint16_t>(fixedHeader.getFlags()) << std::dec;
419 LOG(DEBUG) << connectionName << " MQTT: RemainingLength: " << fixedHeader.getRemainingLength();
420 }
421
422 std::string Mqtt::toHexString(const std::vector<char>& data) {
423 const std::string hexDump = utils::hexDump(data, 32);
424 return !hexDump.empty() ? std::string(32, ' ').append(hexDump) : "";
425 }
426
427 std::string Mqtt::toHexString(const std::string& data) {
428 return toHexString(std::vector<char>(data.begin(), data.end()));
429 }
430
431 uint16_t Mqtt::getPacketIdentifier() const {
432 do {
434 if (_packetIdentifier == 0) {
436 }
438
439 return _packetIdentifier;
440 }
441
442} // namespace iot::mqtt
void restart()
Definition Timer.cpp:90
void cancel()
Definition Timer.cpp:84
virtual void setTimeout(const utils::Timeval &timeout)=0
Timer & operator=(Timer &&timer) noexcept=default
static Timer singleshotTimer(const std::function< void()> &dispatcher, const utils::Timeval &timeout)
Definition Timer.cpp:57
std::size_t deserialize(iot::mqtt::MqttContext *mqttContext)
std::vector< char > serialize() const
const std::string & getName() const
virtual std::vector< char > serializeVP() const =0
uint8_t getType() const
uint32_t getRemainingLength() const
uint8_t getFlags() const
std::vector< char > serialize() const
std::size_t deserialize(iot::mqtt::MqttContext *mqttContext)
virtual void send(const char *chunk, std::size_t chunklen)=0
virtual core::socket::stream::SocketConnection * getSocketConnection() const =0
virtual void close()=0
MqttContext * mqttContext
Definition Mqtt.h:160
static std::string toHexString(const std::string &data)
Definition Mqtt.cpp:427
const std::string & getConnectionName() const
Definition Mqtt.cpp:170
virtual void onConnected()
Definition Mqtt.cpp:95
static std::string toHexString(const std::vector< char > &data)
Definition Mqtt.cpp:422
virtual void onPubcomp(const iot::mqtt::packets::Pubcomp &pubcomp)
Definition Mqtt.cpp:263
virtual void onPublish(const iot::mqtt::packets::Publish &publish)
Definition Mqtt.cpp:251
core::timer::Timer keepAliveTimer
Definition Mqtt.h:153
iot::mqtt::FixedHeader fixedHeader
Definition Mqtt.h:148
const MqttContext * getMqttContext() const
Definition Mqtt.cpp:91
std::size_t onReceivedFromPeer()
Definition Mqtt.cpp:99
void sendPublish(const std::string &topic, const std::string &message, uint8_t qoS, bool retain) const
Definition Mqtt.cpp:217
virtual void onDisconnected()
Definition Mqtt.cpp:166
void send(const std::vector< char > &data) const
Definition Mqtt.cpp:211
void setMqttContext(MqttContext *mqttContext)
Definition Mqtt.cpp:87
virtual void deliverPublish(const iot::mqtt::packets::Publish &publish)=0
void sendPuback(uint16_t packetIdentifier) const
Definition Mqtt.cpp:235
virtual iot::mqtt::ControlPacketDeserializer * createControlPacketDeserializer(iot::mqtt::FixedHeader &staticHeader) const =0
virtual void onPuback(const iot::mqtt::packets::Puback &puback)
Definition Mqtt.cpp:254
std::string connectionName
Definition Mqtt.h:144
void _onPubrec(const iot::mqtt::packets::Pubrec &pubrec)
Definition Mqtt.cpp:337
virtual void deliverPacket(iot::mqtt::ControlPacketDeserializer *controlPacketDeserializer)=0
Mqtt(const std::string &connectionName, const std::string &clientId)
Definition Mqtt.cpp:73
void sendPubrel(uint16_t packetIdentifier) const
Definition Mqtt.cpp:243
bool _onPublish(const iot::mqtt::packets::Publish &publish)
Definition Mqtt.cpp:266
virtual ~Mqtt()
Definition Mqtt.cpp:78
void printFixedHeader(const iot::mqtt::FixedHeader &fixedHeader) const
Definition Mqtt.cpp:409
uint16_t _packetIdentifier
Definition Mqtt.h:151
Session * session
Definition Mqtt.h:157
void _onPubrel(const iot::mqtt::packets::Pubrel &pubrel)
Definition Mqtt.cpp:354
virtual void onPubrec(const iot::mqtt::packets::Pubrec &pubrec)
Definition Mqtt.cpp:257
virtual void onPubrel(const iot::mqtt::packets::Pubrel &pubrel)
Definition Mqtt.cpp:260
void send(const iot::mqtt::ControlPacket &controlPacket) const
Definition Mqtt.cpp:205
std::string clientId
Definition Mqtt.h:145
Mqtt(const std::string &connectionName)
Definition Mqtt.cpp:69
void initSession(Session *session, utils::Timeval keepAlive)
Definition Mqtt.cpp:174
void printVP(const iot::mqtt::ControlPacket &packet) const
Definition Mqtt.cpp:400
iot::mqtt::ControlPacketDeserializer * controlPacketDeserializer
Definition Mqtt.h:149
void sendPubcomp(uint16_t packetIdentifier) const
Definition Mqtt.cpp:247
void sendPubrec(uint16_t packetIdentifier) const
Definition Mqtt.cpp:239
uint16_t getPacketIdentifier() const
Definition Mqtt.cpp:431
void _onPubcomp(const iot::mqtt::packets::Pubcomp &pubcomp)
Definition Mqtt.cpp:385
void _onPuback(const iot::mqtt::packets::Puback &puback)
Definition Mqtt.cpp:323
std::map< uint16_t, iot::mqtt::packets::Publish > outgoingPublishMap
Definition Session.h:79
std::map< uint16_t, iot::mqtt::packets::Publish > incomingPublishMap
Definition Session.h:83
std::set< uint16_t > pubcompPacketIdentifierSet
Definition Session.h:84
std::set< uint16_t > pubrelPacketIdentifierSet
Definition Session.h:80
uint16_t getPacketIdentifier() const
Definition Puback.cpp:66
Puback(uint16_t packetIdentifier)
Definition Puback.cpp:57
uint16_t getPacketIdentifier() const
Definition Pubcomp.cpp:66
Pubcomp(uint16_t packetIdentifier)
Definition Pubcomp.cpp:57
uint8_t getQoS() const
Definition Publish.cpp:85
std::string getTopic() const
Definition Publish.cpp:93
std::string getMessage() const
Definition Publish.cpp:97
Publish(uint16_t packetIdentifier, const std::string &topic, const std::string &message, uint8_t qoS, bool dup, bool retain)
Definition Publish.cpp:56
uint16_t getPacketIdentifier() const
Definition Publish.cpp:89
Pubrec(uint16_t packetIdentifier)
Definition Pubrec.cpp:57
uint16_t getPacketIdentifier() const
Definition Pubrec.cpp:66
Pubrel(uint16_t packetIdentifier)
Definition Pubrel.cpp:57
uint16_t getPacketIdentifier() const
Definition Pubrel.cpp:66
bool operator>(const Timeval &timeVal) const
Definition Timeval.cpp:149
Timeval & operator*=(double mul)
Definition Timeval.cpp:135
Definition Timer.h:59
const std::vector< std::string > mqttPackageName
Definition Config.h:59
std::string hexDump(const std::vector< char > &bytes, int prefixLength, bool prefixAtFirstLine)
Definition hexdump.cpp:58