SNode.C
Loading...
Searching...
No Matches
Mqtt.cpp
Go to the documentation of this file.
1/*
2 * SNode.C - A Slim Toolkit for Network Communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "Mqtt.h"
43
44#include "MqttContext.h"
45#include "core/socket/stream/SocketConnection.h"
46#include "iot/mqtt/ControlPacketDeserializer.h"
47#include "iot/mqtt/Session.h"
48#include "iot/mqtt/packets/Puback.h"
49#include "iot/mqtt/packets/Pubcomp.h"
50#include "iot/mqtt/packets/Publish.h"
51#include "iot/mqtt/packets/Pubrec.h"
52#include "iot/mqtt/packets/Pubrel.h"
53
54#ifndef DOXYGEN_SHOULD_SKIP_THIS
55
56#include "log/Logger.h"
57#include "utils/hexdump.h"
58
59#include <functional>
60#include <iomanip>
61#include <ios>
62#include <map>
63#include <set>
64
65#endif // DOXYGEN_SHOULD_SKIP_THIS
66
67namespace iot::mqtt {
68
69 Mqtt::Mqtt(const std::string& connectionName)
70 : connectionName(connectionName) {
71 }
72
73 Mqtt::Mqtt(const std::string& connectionName, const std::string& clientId)
74 : connectionName(connectionName)
75 , clientId(clientId) {
76 }
77
78 Mqtt::~Mqtt() {
79 if (controlPacketDeserializer != nullptr) {
82 }
83
85 }
86
87 void Mqtt::setMqttContext(MqttContext* mqttContext) {
88 this->mqttContext = mqttContext;
89 }
90
92 return mqttContext;
93 }
94
95 void Mqtt::onConnected() {
96 LOG(INFO) << "MQTT: Connected";
97 }
98
99 std::size_t Mqtt::onReceivedFromPeer() {
100 std::size_t consumed = 0;
101
102 switch (state) {
103 case 0:
107 break;
108 }
111 break;
112 }
114
116
118
119 if (controlPacketDeserializer == nullptr) {
120 LOG(DEBUG) << connectionName << " MQTT: Received packet-type is unavailable ... closing connection";
121
123 break;
124 }
126 LOG(DEBUG) << connectionName << " MQTT: Fixed header has error ... closing connection";
127
130
132 break;
133 }
134
135 state++;
136
137 [[fallthrough]];
138 case 1:
140
142 LOG(DEBUG) << connectionName << " MQTT: Control packet has error ... closing connection";
144
147
148 state = 0;
151
154
155 state = 0;
156
158 }
159
160 break;
161 }
162
163 return consumed;
164 }
165
167 LOG(INFO) << connectionName << " MQTT: Disconnected";
168 }
169
170 const std::string& Mqtt::getConnectionName() const {
171 return connectionName;
172 }
173
174 void Mqtt::initSession(Session* session, utils::Timeval keepAlive) {
175 this->session = session;
176
177 for (const auto& [packetIdentifier, publish] : session->publishMap) {
178 LOG(INFO) << connectionName << " MQTT: PUBLISH Resend";
179
180 send(publish);
181 }
182
183 for (const uint16_t packetIdentifier : session->pubrelPacketIdentifierSet) {
184 LOG(INFO) << connectionName << " MQTT: PUBREL Resend";
185
186 send(iot::mqtt::packets::Pubrel(packetIdentifier));
187 }
188
189 if (keepAlive > 0) {
190 keepAlive *= 1.5;
191
192 LOG(INFO) << connectionName << " MQTT: Keep alive initialized with: " << keepAlive;
193
195 [this, keepAlive]() {
196 LOG(ERROR) << connectionName << " MQTT: Keep-alive timer expired. Interval was: " << keepAlive;
198 },
199 keepAlive);
200 }
201
203 }
204
205 void Mqtt::send(const ControlPacket& controlPacket) const {
206 LOG(INFO) << connectionName << " MQTT: " << controlPacket.getName() << " send: " << clientId;
207
208 send(controlPacket.serialize());
209 }
210
211 void Mqtt::send(const std::vector<char>& data) const {
212 LOG(TRACE) << connectionName << " MQTT: Send data (full message):\n" << toHexString(data);
213
214 mqttContext->send(data.data(), data.size());
215 }
216
217 void Mqtt::sendPublish(const std::string& topic, const std::string& message, uint8_t qoS,
218 bool retain) { // Server & Client
219
220 uint16_t packageIdentifier = qoS != 0 ? getPacketIdentifier() : 0;
221
222 send(iot::mqtt::packets::Publish(packageIdentifier, topic, message, qoS, false, retain));
223
224 LOG(INFO) << connectionName << " MQTT: Topic: " << topic;
225 LOG(INFO) << connectionName << " MQTT: Message:\n" << toHexString(message);
226 LOG(DEBUG) << connectionName << " MQTT: QoS: " << static_cast<uint16_t>(qoS);
227 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: " << _packetIdentifier;
228 LOG(DEBUG) << connectionName << " MQTT: DUP: " << false;
229 LOG(DEBUG) << connectionName << " MQTT: Retain: " << retain;
230
231 if (qoS == 2) {
232 session->publishMap.emplace(packageIdentifier,
233 iot::mqtt::packets::Publish(packageIdentifier, topic, message, qoS, true, retain));
234 }
235 }
236
237 void Mqtt::sendPuback(uint16_t packetIdentifier) const { // Server & Client
238 send(iot::mqtt::packets::Puback(packetIdentifier));
239 }
240
241 void Mqtt::sendPubrec(uint16_t packetIdentifier) const { // Server & Client
242 send(iot::mqtt::packets::Pubrec(packetIdentifier));
243 }
244
245 void Mqtt::sendPubrel(uint16_t packetIdentifier) const { // Server & Client
246 send(iot::mqtt::packets::Pubrel(packetIdentifier));
247 }
248
249 void Mqtt::sendPubcomp(uint16_t packetIdentifier) const { // Server & Client
250 send(iot::mqtt::packets::Pubcomp(packetIdentifier));
251 }
252
253 void Mqtt::onPublish([[maybe_unused]] const packets::Publish& publish) {
254 }
255
256 void Mqtt::onPuback([[maybe_unused]] const iot::mqtt::packets::Puback& puback) {
257 }
258
259 void Mqtt::onPubrec([[maybe_unused]] const iot::mqtt::packets::Pubrec& pubrec) {
260 }
261
262 void Mqtt::onPubrel([[maybe_unused]] const iot::mqtt::packets::Pubrel& pubrel) {
263 }
264
265 void Mqtt::onPubcomp([[maybe_unused]] const iot::mqtt::packets::Pubcomp& pubcomp) {
266 }
267
268 bool Mqtt::_onPublish(const packets::Publish& publish) {
269 bool deliver = true;
270
271 LOG(INFO) << connectionName << " MQTT: Topic: " << publish.getTopic();
272 LOG(INFO) << connectionName << " MQTT: Message:\n" << toHexString(publish.getMessage());
273 LOG(DEBUG) << connectionName << " MQTT: QoS: " << static_cast<uint16_t>(publish.getQoS());
274 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: " << publish.getPacketIdentifier();
275 LOG(DEBUG) << connectionName << " MQTT: DUP: " << publish.getDup();
276 LOG(DEBUG) << connectionName << " MQTT: Retain: " << publish.getRetain();
277
278 if (publish.getQoS() > 2) {
279 LOG(ERROR) << connectionName << " MQTT: Received invalid QoS: " << publish.getQoS();
281 deliver = false;
282 } else if (publish.getPacketIdentifier() == 0 && publish.getQoS() > 0) {
283 LOG(ERROR) << connectionName << " MQTT: Received QoS > 0 but no PackageIdentifier present";
285 deliver = false;
286 } else {
287 switch (publish.getQoS()) {
288 case 1:
290
291 break;
292 case 2:
294
296 deliver = false;
297 } else {
299 }
300
301 break;
302 }
303 }
304
305 return deliver;
306 }
307
308 void Mqtt::_onPuback(const iot::mqtt::packets::Puback& puback) {
309 if (puback.getPacketIdentifier() == 0) {
310 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
312 } else {
313 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
314 << puback.getPacketIdentifier();
315 }
316
317 onPuback(puback);
318 }
319
320 void Mqtt::_onPubrec(const iot::mqtt::packets::Pubrec& pubrec) {
321 if (pubrec.getPacketIdentifier() == 0) {
322 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
324 } else {
325 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
326 << pubrec.getPacketIdentifier();
327
330
332 }
333
334 onPubrec(pubrec);
335 }
336
337 void Mqtt::_onPubrel(const iot::mqtt::packets::Pubrel& pubrel) {
338 if (pubrel.getPacketIdentifier() == 0) {
339 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
341 } else {
342 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
343 << pubrel.getPacketIdentifier();
344
346
348 }
349
350 onPubrel(pubrel);
351 }
352
353 void Mqtt::_onPubcomp(const iot::mqtt::packets::Pubcomp& pubcomp) {
354 if (pubcomp.getPacketIdentifier() == 0) {
355 LOG(ERROR) << connectionName << " MQTT: PackageIdentifier missing";
357 } else {
358 LOG(DEBUG) << connectionName << " MQTT: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
359 << pubcomp.getPacketIdentifier();
360
363 }
364
365 onPubcomp(pubcomp);
366 }
367
368 void Mqtt::printVP(const iot::mqtt::ControlPacket& packet) const {
369 LOG(INFO) << connectionName << " MQTT: " << packet.getName() << " received: " << clientId;
370
371 const std::string hexString = toHexString(packet.serializeVP());
372 if (!hexString.empty()) {
373 LOG(TRACE) << connectionName << " MQTT: Received data (variable header and payload):\n" << hexString;
374 }
375 }
376
377 void Mqtt::printFixedHeader(const FixedHeader& fixedHeader) const {
378 LOG(INFO) << connectionName << " MQTT: ======================================================";
379
380 LOG(TRACE) << connectionName << " MQTT: Received data (fixed header):\n" << toHexString(fixedHeader.serialize());
381
382 LOG(DEBUG) << connectionName << " MQTT: Fixed Header: PacketType: 0x" << std::hex << std::setfill('0') << std::setw(2)
383 << static_cast<uint16_t>(fixedHeader.getType()) << " (" << iot::mqtt::mqttPackageName[fixedHeader.getType()] << ")";
384 LOG(DEBUG) << connectionName << " MQTT: PacketFlags: 0x" << std::hex << std::setfill('0') << std::setw(2)
385 << static_cast<uint16_t>(fixedHeader.getFlags()) << std::dec;
386 LOG(DEBUG) << connectionName << " MQTT: RemainingLength: " << fixedHeader.getRemainingLength();
387 }
388
389 std::string Mqtt::toHexString(const std::vector<char>& data) {
390 const std::string hexDump = utils::hexDump(data, 32);
391 return !hexDump.empty() ? std::string(32, ' ').append(hexDump) : "";
392 }
393
394 std::string Mqtt::toHexString(const std::string& data) {
395 return toHexString(std::vector<char>(data.begin(), data.end()));
396 }
397
400
401 if (_packetIdentifier == 0) {
403 }
404
405 return _packetIdentifier;
406 }
407
408} // namespace iot::mqtt
void restart()
Definition Timer.cpp:90
void cancel()
Definition Timer.cpp:84
virtual void setTimeout(const utils::Timeval &timeout)=0
Timer & operator=(Timer &&timer) noexcept=default
static Timer singleshotTimer(const std::function< void()> &dispatcher, const utils::Timeval &timeout)
Definition Timer.cpp:57
std::size_t deserialize(iot::mqtt::MqttContext *mqttContext)
std::vector< char > serialize() const
const std::string & getName() const
virtual std::vector< char > serializeVP() const =0
uint8_t getType() const
uint32_t getRemainingLength() const
uint8_t getFlags() const
std::vector< char > serialize() const
std::size_t deserialize(iot::mqtt::MqttContext *mqttContext)
virtual void send(const char *chunk, std::size_t chunklen)=0
virtual core::socket::stream::SocketConnection * getSocketConnection() const =0
virtual void end(bool fatal=false)=0
virtual void close()=0
MqttContext * mqttContext
Definition Mqtt.h:158
static std::string toHexString(const std::string &data)
Definition Mqtt.cpp:394
const std::string & getConnectionName() const
Definition Mqtt.cpp:170
virtual void onConnected()
Definition Mqtt.cpp:95
static std::string toHexString(const std::vector< char > &data)
Definition Mqtt.cpp:389
virtual void onPubcomp(const iot::mqtt::packets::Pubcomp &pubcomp)
Definition Mqtt.cpp:265
virtual void onPublish(const iot::mqtt::packets::Publish &publish)
Definition Mqtt.cpp:253
core::timer::Timer keepAliveTimer
Definition Mqtt.h:151
iot::mqtt::FixedHeader fixedHeader
Definition Mqtt.h:146
const MqttContext * getMqttContext() const
Definition Mqtt.cpp:91
std::size_t onReceivedFromPeer()
Definition Mqtt.cpp:99
virtual void onDisconnected()
Definition Mqtt.cpp:166
void send(const std::vector< char > &data) const
Definition Mqtt.cpp:211
void setMqttContext(MqttContext *mqttContext)
Definition Mqtt.cpp:87
void sendPuback(uint16_t packetIdentifier) const
Definition Mqtt.cpp:237
virtual void onPuback(const iot::mqtt::packets::Puback &puback)
Definition Mqtt.cpp:256
std::string connectionName
Definition Mqtt.h:142
void _onPubrec(const iot::mqtt::packets::Pubrec &pubrec)
Definition Mqtt.cpp:320
virtual void deliverPacket(iot::mqtt::ControlPacketDeserializer *controlPacketDeserializer)=0
Mqtt(const std::string &connectionName, const std::string &clientId)
Definition Mqtt.cpp:73
void sendPubrel(uint16_t packetIdentifier) const
Definition Mqtt.cpp:245
uint16_t getPacketIdentifier()
Definition Mqtt.cpp:398
bool _onPublish(const iot::mqtt::packets::Publish &publish)
Definition Mqtt.cpp:268
virtual iot::mqtt::ControlPacketDeserializer * createControlPacketDeserializer(iot::mqtt::FixedHeader &staticHeader)=0
void sendPublish(const std::string &topic, const std::string &message, uint8_t qoS, bool retain)
Definition Mqtt.cpp:217
virtual ~Mqtt()
Definition Mqtt.cpp:78
void printFixedHeader(const iot::mqtt::FixedHeader &fixedHeader) const
Definition Mqtt.cpp:377
uint16_t _packetIdentifier
Definition Mqtt.h:149
Session * session
Definition Mqtt.h:155
void _onPubrel(const iot::mqtt::packets::Pubrel &pubrel)
Definition Mqtt.cpp:337
virtual void onPubrec(const iot::mqtt::packets::Pubrec &pubrec)
Definition Mqtt.cpp:259
virtual void onPubrel(const iot::mqtt::packets::Pubrel &pubrel)
Definition Mqtt.cpp:262
void send(const iot::mqtt::ControlPacket &controlPacket) const
Definition Mqtt.cpp:205
std::string clientId
Definition Mqtt.h:143
Mqtt(const std::string &connectionName)
Definition Mqtt.cpp:69
void initSession(Session *session, utils::Timeval keepAlive)
Definition Mqtt.cpp:174
void printVP(const iot::mqtt::ControlPacket &packet) const
Definition Mqtt.cpp:368
iot::mqtt::ControlPacketDeserializer * controlPacketDeserializer
Definition Mqtt.h:147
void sendPubcomp(uint16_t packetIdentifier) const
Definition Mqtt.cpp:249
void sendPubrec(uint16_t packetIdentifier) const
Definition Mqtt.cpp:241
void _onPubcomp(const iot::mqtt::packets::Pubcomp &pubcomp)
Definition Mqtt.cpp:353
void _onPuback(const iot::mqtt::packets::Puback &puback)
Definition Mqtt.cpp:308
std::set< uint16_t > publishPacketIdentifierSet
Definition Session.h:85
std::map< uint16_t, iot::mqtt::packets::Publish > publishMap
Definition Session.h:81
std::set< uint16_t > pubrelPacketIdentifierSet
Definition Session.h:82
uint16_t getPacketIdentifier() const
Definition Puback.cpp:66
Puback(uint16_t packetIdentifier)
Definition Puback.cpp:57
uint16_t getPacketIdentifier() const
Definition Pubcomp.cpp:66
Pubcomp(uint16_t packetIdentifier)
Definition Pubcomp.cpp:57
uint8_t getQoS() const
Definition Publish.cpp:85
std::string getTopic() const
Definition Publish.cpp:93
std::string getMessage() const
Definition Publish.cpp:97
Publish(uint16_t packetIdentifier, const std::string &topic, const std::string &message, uint8_t qoS, bool dup, bool retain)
Definition Publish.cpp:56
uint16_t getPacketIdentifier() const
Definition Publish.cpp:89
Pubrec(uint16_t packetIdentifier)
Definition Pubrec.cpp:57
uint16_t getPacketIdentifier() const
Definition Pubrec.cpp:66
Pubrel(uint16_t packetIdentifier)
Definition Pubrel.cpp:57
uint16_t getPacketIdentifier() const
Definition Pubrel.cpp:66
bool operator>(const Timeval &timeVal) const
Definition Timeval.cpp:149
Timeval & operator*=(double mul)
Definition Timeval.cpp:135
Definition Timer.h:59
const std::vector< std::string > mqttPackageName
std::string hexDump(const std::vector< char > &bytes, int prefixLength, bool prefixAtFirstLine)
Definition hexdump.cpp:58