SNode.C
Loading...
Searching...
No Matches
Mqtt.cpp
Go to the documentation of this file.
1/*
2 * SNode.C - a slim toolkit for network communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "iot/mqtt/server/Mqtt.h"
21
22#include "iot/mqtt/MqttContext.h"
23#include "iot/mqtt/packets/Connack.h"
24#include "iot/mqtt/packets/Pingresp.h"
25#include "iot/mqtt/packets/Suback.h"
26#include "iot/mqtt/packets/Unsuback.h"
27#include "iot/mqtt/server/broker/Broker.h"
28#include "iot/mqtt/server/packets/Connect.h"
29#include "iot/mqtt/server/packets/Disconnect.h"
30#include "iot/mqtt/server/packets/Pingreq.h"
31#include "iot/mqtt/server/packets/Puback.h"
32#include "iot/mqtt/server/packets/Pubcomp.h"
33#include "iot/mqtt/server/packets/Publish.h"
34#include "iot/mqtt/server/packets/Pubrec.h"
35#include "iot/mqtt/server/packets/Pubrel.h"
36#include "iot/mqtt/server/packets/Subscribe.h"
37#include "iot/mqtt/server/packets/Unsubscribe.h"
38
39#ifndef DOXYGEN_SHOULD_SKIP_THIS
40
41#include "log/Logger.h"
42
43#include <cstdint>
44#include <iomanip>
45#include <ios>
46
47#endif // DOXYGEN_SHOULD_SKIP_THIS
48
49namespace iot::mqtt::server {
50
51 Mqtt::Mqtt(const std::string& connectionName, const std::shared_ptr<broker::Broker>& broker)
52 : Super(connectionName)
53 , broker(broker) {
54 }
55
56 Mqtt::~Mqtt() {
58
59 if (willFlag) {
60 broker->publish(clientId, willTopic, willMessage, willQoS, willRetain);
61 }
62 }
63
64 bool Mqtt::onSignal([[maybe_unused]] int sig) {
65 willFlag = false;
66 return true;
67 }
68
69 iot::mqtt::ControlPacketDeserializer* Mqtt::createControlPacketDeserializer(iot::mqtt::FixedHeader& fixedHeader) {
70 iot::mqtt::ControlPacketDeserializer* controlPacketDeserializer = nullptr;
71
72 switch (fixedHeader.getType()) {
73 case MQTT_CONNECT:
74 controlPacketDeserializer =
75 new iot::mqtt::server::packets::Connect(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
76 break;
77 case MQTT_PUBLISH:
78 controlPacketDeserializer =
79 new iot::mqtt::server::packets::Publish(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
80 break;
81 case MQTT_PUBACK:
82 controlPacketDeserializer =
83 new iot::mqtt::server::packets::Puback(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
84 break;
85 case MQTT_PUBREC:
86 controlPacketDeserializer =
87 new iot::mqtt::server::packets::Pubrec(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
88 break;
89 case MQTT_PUBREL:
90 controlPacketDeserializer =
91 new iot::mqtt::server::packets::Pubrel(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
92 break;
93 case MQTT_PUBCOMP:
94 controlPacketDeserializer =
95 new iot::mqtt::server::packets::Pubcomp(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
96 break;
97 case MQTT_SUBSCRIBE:
98 controlPacketDeserializer =
99 new iot::mqtt::server::packets::Subscribe(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
100 break;
101 case MQTT_UNSUBSCRIBE:
102 controlPacketDeserializer =
103 new iot::mqtt::server::packets::Unsubscribe(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
104 break;
105 case MQTT_PINGREQ:
106 controlPacketDeserializer =
107 new iot::mqtt::server::packets::Pingreq(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
108 break;
109 case MQTT_DISCONNECT:
110 controlPacketDeserializer =
111 new iot::mqtt::server::packets::Disconnect(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
112 break;
113 default:
114 controlPacketDeserializer = nullptr;
115 break;
116 }
117
118 return controlPacketDeserializer;
119 }
120
121 void Mqtt::deliverPacket(iot::mqtt::ControlPacketDeserializer* controlPacketDeserializer) {
122 static_cast<iot::mqtt::server::ControlPacketDeserializer*>(controlPacketDeserializer)->deliverPacket(this); // NOLINT
123 }
124
125 bool Mqtt::initSession(const utils::Timeval& keepAlive) {
126 bool success = true;
127
128 if (broker->hasActiveSession(clientId)) {
129 LOG(ERROR) << connectionName << " MQTT Broker: Existing session found for ClientId = " << clientId;
130 LOG(ERROR) << connectionName << " MQTT Broker: closing";
132
133 willFlag = false;
134 success = false;
135 } else if (broker->hasRetainedSession(clientId)) {
136 LOG(INFO) << connectionName << " MQTT Broker: Retained session found for ClientId = " << clientId;
137 if (cleanSession) {
138 LOG(DEBUG) << connectionName << " New SessionId = " << this;
140
141 broker->unsubscribe(clientId);
142 initSession(broker->newSession(clientId, this), keepAlive);
143 } else {
144 LOG(DEBUG) << connectionName << " Renew SessionId = " << this;
146
147 initSession(broker->renewSession(clientId, this), keepAlive);
148 broker->restartSession(clientId);
149 }
150 } else {
151 LOG(INFO) << connectionName << " MQTT Broker: No session found for ClientId = " << clientId;
152 LOG(INFO) << connectionName << " MQTT Broker: new SessionId = " << this;
153
155
156 initSession(broker->newSession(clientId, this), keepAlive);
157 }
158
159 return success;
160 }
161
163 if (broker->isActiveSession(clientId, this)) {
164 if (cleanSession) {
165 LOG(DEBUG) << connectionName << " MQTT Broker: Delete session for ClientId = " << clientId;
166 LOG(DEBUG) << connectionName << " MQTT Broker: SessionId = " << this;
167 broker->deleteSession(clientId);
168 } else {
169 LOG(DEBUG) << connectionName << " MQTT Broker: Retain session for ClientId = " << clientId;
170 LOG(DEBUG) << connectionName << " MQTT Broker: SessionId = " << this;
171 broker->retainSession(clientId);
172 }
173 }
174 }
175
176 void Mqtt::onConnect([[maybe_unused]] const iot::mqtt::packets::Connect& connect) {
177 }
178
179 void Mqtt::onSubscribe([[maybe_unused]] const iot::mqtt::packets::Subscribe& subscribe) {
180 }
181
182 void Mqtt::onUnsubscribe([[maybe_unused]] const iot::mqtt::packets::Unsubscribe& unsubscribe) {
183 }
184
185 void Mqtt::onPingreq([[maybe_unused]] const iot::mqtt::packets::Pingreq& pingreq) {
186 }
187
188 void Mqtt::onDisconnect([[maybe_unused]] const iot::mqtt::packets::Disconnect& disconnect) {
189 }
190
191 void Mqtt::_onConnect(const iot::mqtt::server::packets::Connect& connect) {
192 LOG(INFO) << connectionName << " MQTT Broker: Protocol: " << connect.getProtocol();
193 LOG(INFO) << connectionName << " MQTT Broker: Version: " << static_cast<uint16_t>(connect.getLevel());
194 LOG(INFO) << connectionName << " MQTT Broker: ConnectFlags: 0x" << std::hex << std::setfill('0') << std::setw(2)
195 << static_cast<uint16_t>(connect.getConnectFlags()) << std::dec << std::setw(0);
196 LOG(INFO) << connectionName << " MQTT Broker: KeepAlive: " << connect.getKeepAlive();
197 LOG(INFO) << connectionName << " MQTT Broker: ClientID: " << connect.getClientId();
198 LOG(INFO) << connectionName << " MQTT Broker: CleanSession: " << connect.getCleanSession();
199
200 if (connect.getWillFlag()) {
201 LOG(INFO) << connectionName << " MQTT Broker: WillTopic: " << connect.getWillTopic();
202 LOG(INFO) << connectionName << " MQTT Broker: WillMessage: " << connect.getWillMessage();
203 LOG(INFO) << connectionName << " MQTT Broker: WillQoS: " << static_cast<uint16_t>(connect.getWillQoS());
204 LOG(INFO) << connectionName << " MQTT Broker: WillRetain: " << connect.getWillRetain();
205 }
206 if (connect.getUsernameFlag()) {
207 LOG(INFO) << connectionName << " MQTT Broker: Username: " << connect.getUsername();
208 }
209 if (connect.getPasswordFlag()) {
210 LOG(INFO) << connectionName << " MQTT Broker: Password: " << connect.getPassword();
211 }
212
213 if (connect.getProtocol() != "MQTT") {
214 LOG(ERROR) << connectionName << " MQTT Broker: Wrong Protocol: " << connect.getProtocol();
215 mqttContext->end(true);
216 } else if ((connect.getLevel()) != MQTT_VERSION_3_1_1) {
217 LOG(ERROR) << connectionName << " MQTT Broker: Wrong Protocol Level: " << MQTT_VERSION_3_1_1 << " != " << connect.getLevel();
219
220 mqttContext->end(true);
221 } else if (connect.isFakedClientId() && !connect.getCleanSession()) {
222 LOG(ERROR) << connectionName << " MQTT Broker: Resume session but no ClientId present";
224
225 mqttContext->end(true);
226 } else {
227 // V-Header
228 protocol = connect.getProtocol();
229 level = connect.getLevel();
230 reflect = connect.getReflect();
231 connectFlags = connect.getConnectFlags();
232 keepAlive = connect.getKeepAlive();
233
234 // Payload
235 clientId = connect.getClientId();
236 willTopic = connect.getWillTopic();
237 willMessage = connect.getWillMessage();
238 username = connect.getUsername();
239 password = connect.getPassword();
240
241 // Derived from flags
242 usernameFlag = connect.getUsernameFlag();
243 passwordFlag = connect.getPasswordFlag();
244 willRetain = connect.getWillRetain();
245 willQoS = connect.getWillQoS();
246 willFlag = connect.getWillFlag();
247 cleanSession = connect.getCleanSession();
248
249 if (initSession(1.5 * keepAlive)) {
250 onConnect(connect);
251 } else {
252 mqttContext->end(true);
253 }
254 }
255 }
256
257 void Mqtt::_onPublish(const iot::mqtt::server::packets::Publish& publish) {
258 if (Super::_onPublish(publish)) {
259 broker->publish(clientId, publish.getTopic(), publish.getMessage(), publish.getQoS(), publish.getRetain());
260
261 onPublish(publish);
262 }
263 }
264
265 void Mqtt::_onSubscribe(const iot::mqtt::server::packets::Subscribe& subscribe) {
266 if (subscribe.getPacketIdentifier() == 0) {
267 LOG(ERROR) << connectionName << " MQTT Broker: PackageIdentifier missing";
268 mqttContext->end(true);
269 } else {
270 LOG(DEBUG) << connectionName << " MQTT Broker: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
271 << subscribe.getPacketIdentifier();
272
273 for (const iot::mqtt::Topic& topic : subscribe.getTopics()) {
274 LOG(INFO) << connectionName << " MQTT Broker: Topic filter: '" << topic.getName()
275 << "', QoS: " << static_cast<uint16_t>(topic.getQoS());
276 }
277
278 std::list<uint8_t> returnCodes;
279 for (const iot::mqtt::Topic& topic : subscribe.getTopics()) {
280 const uint8_t returnCode = broker->subscribe(clientId, topic.getName(), topic.getQoS());
281 returnCodes.push_back(returnCode);
282 }
283
284 sendSuback(subscribe.getPacketIdentifier(), returnCodes);
285
286 onSubscribe(subscribe);
287 }
288 }
289
290 void Mqtt::_onUnsubscribe(const iot::mqtt::server::packets::Unsubscribe& unsubscribe) {
291 if (unsubscribe.getPacketIdentifier() == 0) {
292 LOG(ERROR) << connectionName << " MQTT Broker: PackageIdentifier missing";
293 mqttContext->end(true);
294 } else {
295 LOG(DEBUG) << connectionName << " MQTT Broker: PacketIdentifier: 0x" << std::hex << std::setfill('0') << std::setw(4)
296 << unsubscribe.getPacketIdentifier();
297
298 for (const std::string& topic : unsubscribe.getTopics()) {
299 LOG(INFO) << connectionName << " MQTT Broker: Topic: " << topic;
300 }
301
302 for (const std::string& topic : unsubscribe.getTopics()) {
303 broker->unsubscribe(clientId, topic);
304 }
305
306 sendUnsuback(unsubscribe.getPacketIdentifier());
307
308 onUnsubscribe(unsubscribe);
309 }
310 }
311
312 void Mqtt::_onPingreq(const iot::mqtt::server::packets::Pingreq& pingreq) {
314
315 onPingreq(pingreq);
316 }
317
318 void Mqtt::_onDisconnect(const iot::mqtt::server::packets::Disconnect& disconnect) {
319 willFlag = false;
320
321 onDisconnect(disconnect);
322
324 }
325
326 void Mqtt::sendConnack(uint8_t returnCode, uint8_t flags) const { // Server
327 send(iot::mqtt::packets::Connack(returnCode, flags));
328 }
329
330 void Mqtt::sendSuback(uint16_t packetIdentifier, const std::list<uint8_t>& returnCodes) const { // Server
331 send(iot::mqtt::packets::Suback(packetIdentifier, returnCodes));
332 }
333
334 void Mqtt::sendUnsuback(uint16_t packetIdentifier) const { // Server
335 send(iot::mqtt::packets::Unsuback(packetIdentifier));
336 }
337
338 void Mqtt::sendPingresp() const { // Server
339 send(iot::mqtt::packets::Pingresp());
340 }
341
343 return protocol;
344 }
345
346 uint8_t Mqtt::getLevel() const {
347 return level;
348 }
349
350 uint8_t Mqtt::getConnectFlags() const {
351 return connectFlags;
352 }
353
354 uint16_t Mqtt::getKeepAlive() const {
355 return keepAlive;
356 }
357
359 return clientId;
360 }
361
363 return willTopic;
364 }
365
367 return willMessage;
368 }
369
371 return username;
372 }
373
375 return password;
376 }
377
378 bool Mqtt::getUsernameFlag() const {
379 return usernameFlag;
380 }
381
382 bool Mqtt::getPasswordFlag() const {
383 return passwordFlag;
384 }
385
386 bool Mqtt::getWillRetain() const {
387 return willRetain;
388 }
389
390 uint8_t Mqtt::getWillQoS() const {
391 return willQoS;
392 }
393
394 bool Mqtt::getWillFlag() const {
395 return willFlag;
396 }
397
398 bool Mqtt::getCleanSession() const {
399 return cleanSession;
400 }
401
402 bool Mqtt::getReflect() const {
403 return reflect;
404 }
405
406} // namespace iot::mqtt::server
bool getReflect() const
Definition Mqtt.cpp:402
uint8_t getLevel() const
Definition Mqtt.cpp:346
bool getPasswordFlag() const
Definition Mqtt.cpp:382
void sendSuback(uint16_t packetIdentifier, const std::list< uint8_t > &returnCodes) const
Definition Mqtt.cpp:330
bool getCleanSession() const
Definition Mqtt.cpp:398
bool onSignal(int sig) override
Definition Mqtt.cpp:64
uint8_t getConnectFlags() const
Definition Mqtt.cpp:350
uint16_t getKeepAlive() const
Definition Mqtt.cpp:354
bool getWillRetain() const
Definition Mqtt.cpp:386
std::string getProtocol() const
Definition Mqtt.cpp:342
bool getUsernameFlag() const
Definition Mqtt.cpp:378
iot::mqtt::ControlPacketDeserializer * createControlPacketDeserializer(iot::mqtt::FixedHeader &fixedHeader) final
Definition Mqtt.cpp:69
bool initSession(const utils::Timeval &keepAlive)
Definition Mqtt.cpp:125
std::string getWillMessage() const
Definition Mqtt.cpp:366
void sendPingresp() const
Definition Mqtt.cpp:338
void sendConnack(uint8_t returnCode, uint8_t flags) const
Definition Mqtt.cpp:326
std::string getClientId() const
Definition Mqtt.cpp:358
uint8_t getWillQoS() const
Definition Mqtt.cpp:390
std::string getWillTopic() const
Definition Mqtt.cpp:362
bool getWillFlag() const
Definition Mqtt.cpp:394
void sendUnsuback(uint16_t packetIdentifier) const
Definition Mqtt.cpp:334
std::string getUsername() const
Definition Mqtt.cpp:370
std::string getPassword() const
Definition Mqtt.cpp:374
#define MQTT_SESSION_NEW
Definition Connack.h:44
#define MQTT_CONNACK_ACCEPT
Definition Connack.h:37
#define MQTT_CONNACK_UNACEPTABLEVERSION
Definition Connack.h:38
#define MQTT_SESSION_PRESENT
Definition Connack.h:45
#define MQTT_CONNACK_IDENTIFIERREJECTED
Definition Connack.h:39
#define MQTT_CONNECT
Definition Connect.h:36
#define MQTT_VERSION_3_1_1
Definition Connect.h:38
#define MQTT_DISCONNECT
Definition Disconnect.h:33
#define MQTT_PINGREQ
Definition Pingreq.h:33
#define MQTT_PUBACK
Definition Puback.h:35
#define MQTT_PUBCOMP
Definition Pubcomp.h:35
#define MQTT_PUBLISH
Definition Publish.h:36
#define MQTT_PUBREC
Definition Pubrec.h:35
#define MQTT_PUBREL
Definition Pubrel.h:35
#define MQTT_SUBSCRIBE
Definition Subscribe.h:37
#define MQTT_UNSUBSCRIBE
Definition Unsubscribe.h:37