SNode.C
Loading...
Searching...
No Matches
Mqtt.cpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025, 2026
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "Mqtt.h"
43
44#include "MqttContext.h"
45#include "core/socket/stream/SocketConnection.h"
46#include "iot/mqtt/ControlPacketDeserializer.h"
47#include "iot/mqtt/Session.h"
48#include "iot/mqtt/packets/Puback.h"
49#include "iot/mqtt/packets/Pubcomp.h"
50#include "iot/mqtt/packets/Publish.h"
51#include "iot/mqtt/packets/Pubrec.h"
52#include "iot/mqtt/packets/Pubrel.h"
53
54#ifndef DOXYGEN_SHOULD_SKIP_THIS
55
56#include "log/Logger.h"
57#include "utils/hexdump.h"
58
59#include <functional>
60#include <iomanip>
61#include <map>
62#include <set>
63
64#endif // DOXYGEN_SHOULD_SKIP_THIS
65
66namespace iot::mqtt {
67
68 Mqtt::Mqtt(const std::string& connectionName)
69 : connectionName(connectionName) {
70 }
71
72 Mqtt::Mqtt(const std::string& connectionName, const std::string& clientId)
73 : connectionName(connectionName)
74 , clientId(clientId) {
75 }
76
77 Mqtt::~Mqtt() {
78 if (controlPacketDeserializer != nullptr) {
81 }
82
84 }
85
86 void Mqtt::setMqttContext(MqttContext* mqttContext) {
87 this->mqttContext = mqttContext;
88 }
89
91 return mqttContext;
92 }
93
94 void Mqtt::onConnected() {
95 LOG(INFO) << "MQTT: Connected";
96 }
97
98 std::size_t Mqtt::onReceivedFromPeer() {
99 std::size_t consumed = 0;
100
101 switch (state) {
102 case 0:
104
106 break;
107 }
110 break;
111 }
113
115
117
118 if (controlPacketDeserializer == nullptr) {
119 LOG(DEBUG) << connectionName << " MQTT: Received packet-type is unavailable ... closing connection";
120
122 break;
123 }
125 LOG(DEBUG) << connectionName << " MQTT: Fixed header has error ... closing connection";
126
129
131 break;
132 }
133
134 state++;
135
136 [[fallthrough]];
137 case 1:
139
141 LOG(DEBUG) << connectionName << " MQTT: Control packet has error ... closing connection";
143
146
147 state = 0;
150
153
154 state = 0;
155
157 }
158
159 break;
160 }
161
162 return consumed;
163 }
164
166 LOG(INFO) << connectionName << " MQTT: Disconnected";
167 }
168
169 const std::string& Mqtt::getConnectionName() const {
170 return connectionName;
171 }
172
173 void Mqtt::initSession(Session* session, utils::Timeval keepAlive) {
174 this->session = session;
175
176 for (const auto& [packetIdentifier, publish] : session->outgoingPublishMap) {
177 LOG(INFO) << connectionName << " MQTT: PUBLISH Resend";
178
179 send(publish);
180 }
181
182 for (const uint16_t packetIdentifier : session->pubrelPacketIdentifierSet) {
183 LOG(INFO) << connectionName << " MQTT: PUBREL Resend";
184
185 sendPubrel(packetIdentifier);
186 }
187
188 if (keepAlive > 0) {
189 keepAlive *= 1.5;
190
191 LOG(INFO) << connectionName << " MQTT: Keep alive initialized with: " << keepAlive;
192
194 [this, keepAlive]() {
195 LOG(ERROR) << connectionName << " MQTT: Keep-alive timer expired. Interval was: " << keepAlive;
197 },
198 keepAlive);
199 }
200
202 }
203
204 void Mqtt::send(const ControlPacket& controlPacket) const {
205 LOG(INFO) << connectionName << " MQTT: " << controlPacket.getName() << " send: " << clientId;
206
207 send(controlPacket.serialize());
208 }
209
210 void Mqtt::send(const std::vector<char>& data) const {
211 LOG(TRACE) << connectionName << " MQTT: Send data (full message):\n" << toHexString(data);
212
213 mqttContext->send(data.data(), data.size());
214 }
215
216 void Mqtt::sendPublish(const std::string& topic, const std::string& message, uint8_t qoS, bool retain) const {
217 const uint16_t packetIdentifier = qoS != 0 ? getPacketIdentifier() : 0;
218
219 send(iot::mqtt::packets::Publish(packetIdentifier, topic, message, qoS, false, retain));
220
221 LOG(INFO) << connectionName << " MQTT: Topic: " << topic;
222 LOG(INFO) << connectionName << " MQTT: Message:\n" << toHexString(message);
223 LOG(DEBUG) << connectionName << " MQTT: QoS: " << static_cast<uint16_t>(qoS);
224 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: " << packetIdentifier;
225 LOG(DEBUG) << connectionName << " MQTT: DUP: " << false;
226 LOG(DEBUG) << connectionName << " MQTT: Retain: " << retain;
227
228 if (qoS >= 1) {
229 session->outgoingPublishMap.insert_or_assign(packetIdentifier,
230 iot::mqtt::packets::Publish(packetIdentifier, topic, message, qoS, true, retain));
231 }
232 }
233
234 void Mqtt::sendPuback(uint16_t packetIdentifier) const { // Server & Client
235 send(iot::mqtt::packets::Puback(packetIdentifier));
236 }
237
238 void Mqtt::sendPubrec(uint16_t packetIdentifier) const { // Server & Client
239 send(iot::mqtt::packets::Pubrec(packetIdentifier));
240 }
241
242 void Mqtt::sendPubrel(uint16_t packetIdentifier) const { // Server & Client
243 send(iot::mqtt::packets::Pubrel(packetIdentifier));
244 }
245
246 void Mqtt::sendPubcomp(uint16_t packetIdentifier) const { // Server & Client
247 send(iot::mqtt::packets::Pubcomp(packetIdentifier));
248 }
249
250 void Mqtt::onPublish([[maybe_unused]] const packets::Publish& publish) {
251 }
252
253 void Mqtt::onPuback([[maybe_unused]] const iot::mqtt::packets::Puback& puback) {
254 }
255
256 void Mqtt::onPubrec([[maybe_unused]] const iot::mqtt::packets::Pubrec& pubrec) {
257 }
258
259 void Mqtt::onPubrel([[maybe_unused]] const iot::mqtt::packets::Pubrel& pubrel) {
260 }
261
262 void Mqtt::onPubcomp([[maybe_unused]] const iot::mqtt::packets::Pubcomp& pubcomp) {
263 }
264
265 bool Mqtt::_onPublish(const iot::mqtt::packets::Publish& publish) {
266 bool deliver = true;
267
268 LOG(INFO) << connectionName << " MQTT: Topic: " << publish.getTopic();
269 LOG(INFO) << connectionName << " MQTT: Message:\n" << toHexString(publish.getMessage());
270 LOG(DEBUG) << connectionName << " MQTT: QoS: " << static_cast<uint16_t>(publish.getQoS());
271 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: " << publish.getPacketIdentifier();
272 LOG(DEBUG) << connectionName << " MQTT: DUP: " << publish.getDup();
273 LOG(DEBUG) << connectionName << " MQTT: Retain: " << publish.getRetain();
274
275 if (publish.getQoS() > 2) {
276 LOG(ERROR) << connectionName << " MQTT: Received invalid QoS: " << publish.getQoS();
278 deliver = false;
279 } else if (publish.getPacketIdentifier() == 0 && publish.getQoS() > 0) {
280 LOG(ERROR) << connectionName << " MQTT: Received QoS > 0 but no PackageIdentifier present";
282 deliver = false;
283 } else if (publish.getQoS() == 0 && publish.getDup()) {
284 LOG(ERROR) << connectionName << " MQTT: Received QoS == 0 but dup is set";
286 deliver = false;
287 } else {
288 switch (publish.getQoS()) {
289 case 1:
291
292 break;
293 case 2:
295
296 deliver = false;
297
298 const uint16_t pid = publish.getPacketIdentifier();
299
300 if (session->pubcompPacketIdentifierSet.contains(pid)) {
301 if (!publish.getDup()) {
303 } else {
304 LOG(WARNING) << connectionName << " MQTT: Duplicate QoS2 PUBLISH after PUBCOMP for PacketIdentifier: " << pid;
305 break;
306 }
307 }
308
309 if (session->incomingPublishMap.contains(pid)) {
310 LOG(INFO) << connectionName << " MQTT: Duplicate QoS2 PUBLISH suppressed for PacketIdentifier: " << pid;
311 } else {
312 session->incomingPublishMap.emplace(pid, publish);
313 }
314
315 break;
316 }
317 }
318
319 return deliver;
320 }
321
322 void Mqtt::_onPuback(const iot::mqtt::packets::Puback& puback) {
323 if (puback.getPacketIdentifier() == 0) {
324 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
326 } else {
327 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
328 << puback.getPacketIdentifier() << std::dec;
329
331 }
332
333 onPuback(puback);
334 }
335
336 void Mqtt::_onPubrec(const iot::mqtt::packets::Pubrec& pubrec) {
337 if (pubrec.getPacketIdentifier() == 0) {
338 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
340 } else {
341 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
342 << pubrec.getPacketIdentifier() << std::dec;
343
346
348 }
349
350 onPubrec(pubrec);
351 }
352
353 void Mqtt::_onPubrel(const iot::mqtt::packets::Pubrel& pubrel) {
354 if (pubrel.getPacketIdentifier() == 0) {
355 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
357 } else {
358 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
359 << pubrel.getPacketIdentifier() << std::dec;
360
361 const uint16_t pid = pubrel.getPacketIdentifier();
362
363 if (session->incomingPublishMap.contains(pid)) {
364 LOG(INFO) << connectionName << " MQTT: QoS2 PUBREL received. Deliver publish: " << pid;
365
367
370 } else if (session->pubcompPacketIdentifierSet.contains(pid)) {
371 LOG(INFO) << connectionName << " MQTT: Duplicate QoS2 PUBREL for completed PacketIdentifier: " << pid;
372 } else {
373 LOG(WARNING) << connectionName << " MQTT: QoS2 PUBREL received for unknown PacketIdentifier: " << pid;
374
376 }
377
378 sendPubcomp(pid);
379 }
380
381 onPubrel(pubrel);
382 }
383
384 void Mqtt::_onPubcomp(const iot::mqtt::packets::Pubcomp& pubcomp) {
385 if (pubcomp.getPacketIdentifier() == 0) {
386 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
388 } else {
389 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
390 << pubcomp.getPacketIdentifier() << std::dec;
391
394 }
395
396 onPubcomp(pubcomp);
397 }
398
399 void Mqtt::printVP(const iot::mqtt::ControlPacket& packet) const {
400 LOG(INFO) << connectionName << " MQTT: " << packet.getName() << " received: " << clientId;
401
402 const std::string hexString = toHexString(packet.serializeVP());
403 if (!hexString.empty()) {
404 LOG(TRACE) << connectionName << " MQTT: Received data (variable header and payload):\n" << hexString;
405 }
406 }
407
408 void Mqtt::printFixedHeader(const FixedHeader& fixedHeader) const {
409 LOG(INFO) << connectionName << " MQTT: ====================================";
410
411 LOG(TRACE) << connectionName << " MQTT: Received data (fixed header):\n" << toHexString(fixedHeader.serialize());
412
413 LOG(DEBUG) << connectionName << " MQTT: Fixed Header: PacketType: 0x" << std::hex << std::setfill('0') << std::setw(2)
414 << static_cast<uint16_t>(fixedHeader.getType()) << " (" << iot::mqtt::mqttPackageName[fixedHeader.getType()] << ")"
415 << std::dec;
416 LOG(DEBUG) << connectionName << " MQTT: PacketFlags: 0x" << std::hex << std::setfill('0') << std::setw(2)
417 << static_cast<uint16_t>(fixedHeader.getFlags()) << std::dec;
418 LOG(DEBUG) << connectionName << " MQTT: RemainingLength: " << fixedHeader.getRemainingLength();
419 }
420
421 std::string Mqtt::toHexString(const std::vector<char>& data) {
422 const std::string hexDump = utils::hexDump(data, 32);
423 return !hexDump.empty() ? std::string(32, ' ').append(hexDump) : "";
424 }
425
426 std::string Mqtt::toHexString(const std::string& data) {
427 return toHexString(std::vector<char>(data.begin(), data.end()));
428 }
429
430 uint16_t Mqtt::getPacketIdentifier() const {
431 do {
433 if (_packetIdentifier == 0) {
435 }
437
438 return _packetIdentifier;
439 }
440
441} // namespace iot::mqtt
#define LOG(level)
Definition Logger.h:148
void restart()
Definition Timer.cpp:90
void cancel()
Definition Timer.cpp:84
virtual void setTimeout(const utils::Timeval &timeout)=0
Timer & operator=(Timer &&timer) noexcept=default
static Timer singleshotTimer(const std::function< void()> &dispatcher, const utils::Timeval &timeout)
Definition Timer.cpp:57
std::size_t deserialize(iot::mqtt::MqttContext *mqttContext)
std::vector< char > serialize() const
const std::string & getName() const
virtual std::vector< char > serializeVP() const =0
uint8_t getType() const
uint32_t getRemainingLength() const
uint8_t getFlags() const
std::vector< char > serialize() const
std::size_t deserialize(iot::mqtt::MqttContext *mqttContext)
virtual void send(const char *chunk, std::size_t chunklen)=0
virtual core::socket::stream::SocketConnection * getSocketConnection() const =0
virtual void close()=0
MqttContext * mqttContext
Definition Mqtt.h:160
static std::string toHexString(const std::string &data)
Definition Mqtt.cpp:426
const std::string & getConnectionName() const
Definition Mqtt.cpp:169
virtual void onConnected()
Definition Mqtt.cpp:94
static std::string toHexString(const std::vector< char > &data)
Definition Mqtt.cpp:421
virtual void onPubcomp(const iot::mqtt::packets::Pubcomp &pubcomp)
Definition Mqtt.cpp:262
virtual void onPublish(const iot::mqtt::packets::Publish &publish)
Definition Mqtt.cpp:250
core::timer::Timer keepAliveTimer
Definition Mqtt.h:153
iot::mqtt::FixedHeader fixedHeader
Definition Mqtt.h:148
const MqttContext * getMqttContext() const
Definition Mqtt.cpp:90
std::size_t onReceivedFromPeer()
Definition Mqtt.cpp:98
void sendPublish(const std::string &topic, const std::string &message, uint8_t qoS, bool retain) const
Definition Mqtt.cpp:216
virtual void onDisconnected()
Definition Mqtt.cpp:165
void send(const std::vector< char > &data) const
Definition Mqtt.cpp:210
void setMqttContext(MqttContext *mqttContext)
Definition Mqtt.cpp:86
void sendPuback(uint16_t packetIdentifier) const
Definition Mqtt.cpp:234
virtual iot::mqtt::ControlPacketDeserializer * createControlPacketDeserializer(iot::mqtt::FixedHeader &staticHeader) const =0
virtual void onPuback(const iot::mqtt::packets::Puback &puback)
Definition Mqtt.cpp:253
std::string connectionName
Definition Mqtt.h:144
void _onPubrec(const iot::mqtt::packets::Pubrec &pubrec)
Definition Mqtt.cpp:336
virtual void deliverPacket(iot::mqtt::ControlPacketDeserializer *controlPacketDeserializer)=0
Mqtt(const std::string &connectionName, const std::string &clientId)
Definition Mqtt.cpp:72
void sendPubrel(uint16_t packetIdentifier) const
Definition Mqtt.cpp:242
bool _onPublish(const iot::mqtt::packets::Publish &publish)
Definition Mqtt.cpp:265
virtual ~Mqtt()
Definition Mqtt.cpp:77
void printFixedHeader(const iot::mqtt::FixedHeader &fixedHeader) const
Definition Mqtt.cpp:408
uint16_t _packetIdentifier
Definition Mqtt.h:151
Session * session
Definition Mqtt.h:157
void _onPubrel(const iot::mqtt::packets::Pubrel &pubrel)
Definition Mqtt.cpp:353
virtual void onPubrec(const iot::mqtt::packets::Pubrec &pubrec)
Definition Mqtt.cpp:256
virtual void onPubrel(const iot::mqtt::packets::Pubrel &pubrel)
Definition Mqtt.cpp:259
void send(const iot::mqtt::ControlPacket &controlPacket) const
Definition Mqtt.cpp:204
std::string clientId
Definition Mqtt.h:145
Mqtt(const std::string &connectionName)
Definition Mqtt.cpp:68
void initSession(Session *session, utils::Timeval keepAlive)
Definition Mqtt.cpp:173
void printVP(const iot::mqtt::ControlPacket &packet) const
Definition Mqtt.cpp:399
iot::mqtt::ControlPacketDeserializer * controlPacketDeserializer
Definition Mqtt.h:149
void sendPubcomp(uint16_t packetIdentifier) const
Definition Mqtt.cpp:246
void sendPubrec(uint16_t packetIdentifier) const
Definition Mqtt.cpp:238
uint16_t getPacketIdentifier() const
Definition Mqtt.cpp:430
virtual void distributePublish(const iot::mqtt::packets::Publish &publish)=0
void _onPubcomp(const iot::mqtt::packets::Pubcomp &pubcomp)
Definition Mqtt.cpp:384
void _onPuback(const iot::mqtt::packets::Puback &puback)
Definition Mqtt.cpp:322
std::map< uint16_t, iot::mqtt::packets::Publish > outgoingPublishMap
Definition Session.h:79
std::map< uint16_t, iot::mqtt::packets::Publish > incomingPublishMap
Definition Session.h:83
std::set< uint16_t > pubcompPacketIdentifierSet
Definition Session.h:84
std::set< uint16_t > pubrelPacketIdentifierSet
Definition Session.h:80
uint16_t getPacketIdentifier() const
Definition Puback.cpp:66
Puback(uint16_t packetIdentifier)
Definition Puback.cpp:57
uint16_t getPacketIdentifier() const
Definition Pubcomp.cpp:66
Pubcomp(uint16_t packetIdentifier)
Definition Pubcomp.cpp:57
uint8_t getQoS() const
Definition Publish.cpp:85
std::string getTopic() const
Definition Publish.cpp:93
std::string getMessage() const
Definition Publish.cpp:97
Publish(uint16_t packetIdentifier, const std::string &topic, const std::string &message, uint8_t qoS, bool dup, bool retain)
Definition Publish.cpp:56
uint16_t getPacketIdentifier() const
Definition Publish.cpp:89
Pubrec(uint16_t packetIdentifier)
Definition Pubrec.cpp:57
uint16_t getPacketIdentifier() const
Definition Pubrec.cpp:66
Pubrel(uint16_t packetIdentifier)
Definition Pubrel.cpp:57
uint16_t getPacketIdentifier() const
Definition Pubrel.cpp:66
LogMessage(Level level, int verboseLevel=-1, bool withErrno=false)
Definition Logger.cpp:280
bool operator>(const Timeval &timeVal) const
Definition Timeval.cpp:149
Timeval & operator*=(double mul)
Definition Timeval.cpp:135
Definition Timer.h:59
const std::vector< std::string > mqttPackageName
std::string hexDump(const std::vector< char > &bytes, int prefixLength, bool prefixAtFirstLine)
Definition hexdump.cpp:58