2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
20#include "iot/mqtt/client/Mqtt.h"
22#include "iot/mqtt/MqttContext.h"
23#include "iot/mqtt/client/packets/Connack.h"
24#include "iot/mqtt/client/packets/Pingresp.h"
25#include "iot/mqtt/client/packets/Puback.h"
26#include "iot/mqtt/client/packets/Pubcomp.h"
27#include "iot/mqtt/client/packets/Publish.h"
28#include "iot/mqtt/client/packets/Pubrec.h"
29#include "iot/mqtt/client/packets/Pubrel.h"
30#include "iot/mqtt/client/packets/Suback.h"
31#include "iot/mqtt/client/packets/Unsuback.h"
32#include "iot/mqtt/packets/Connect.h"
33#include "iot/mqtt/packets/Disconnect.h"
34#include "iot/mqtt/packets/Pingreq.h"
35#include "iot/mqtt/packets/Subscribe.h"
36#include "iot/mqtt/packets/Unsubscribe.h"
38#ifndef DOXYGEN_SHOULD_SKIP_THIS
40#include "log/Logger.h"
48#include <nlohmann/json.hpp>
55namespace iot::mqtt::client {
57 Mqtt::
Mqtt(
const std::string& connectionName,
const std::string& clientId)
60 if (!sessionStoreFileName.empty()) {
61 std::ifstream sessionStoreFile(sessionStoreFileName);
63 if (sessionStoreFile.is_open()) {
65 nlohmann::json sessionStoreJson;
67 sessionStoreFile >> sessionStoreJson;
69 session.fromJson(sessionStoreJson);
71 LOG(DEBUG) << connectionName <<
" MQTT Client: ... Persistent session data loaded successful";
72 }
catch (
const nlohmann::json::exception&) {
73 LOG(DEBUG) << connectionName <<
" MQTT Client: ... Starting with empty session: Session store '"
74 << sessionStoreFileName <<
"' empty or corrupted";
79 sessionStoreFile.close();
80 std::remove(sessionStoreFileName.data());
82 LOG(INFO) << connectionName <<
" MQTT Client: Restoring saved session done";
84 PLOG(WARNING) << connectionName <<
" MQTT Client: ... Could not read session store '" << sessionStoreFileName <<
"'";
87 LOG(INFO) << connectionName <<
" MQTT Client: Session not reloaded: Session store filename empty";
92 if (!sessionStoreFileName.empty()) {
93 const nlohmann::json sessionJson = session.toJson();
95 std::ofstream sessionStoreFile(sessionStoreFileName);
97 if (sessionStoreFile.is_open()) {
98 if (!sessionJson.empty()) {
99 sessionStoreFile << sessionJson;
102 sessionStoreFile.close();
104 PLOG(DEBUG) << connectionName <<
" MQTT Client: Could not write session store '" << sessionStoreFileName <<
"'";
107 LOG(INFO) << connectionName <<
" MQTT Client: Session not saved: Session store filename empty";
114 iot::mqtt::ControlPacketDeserializer* currentPacket =
nullptr;
116 switch (fixedHeader.getType()) {
118 currentPacket =
new iot::mqtt::client::packets::Connack(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
121 currentPacket =
new iot::mqtt::client::packets::Publish(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
124 currentPacket =
new iot::mqtt::client::packets::Puback(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
127 currentPacket =
new iot::mqtt::client::packets::Pubrec(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
130 currentPacket =
new iot::mqtt::client::packets::Pubrel(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
133 currentPacket =
new iot::mqtt::client::packets::Pubcomp(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
136 currentPacket =
new iot::mqtt::client::packets::Suback(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
139 currentPacket =
new iot::mqtt::client::packets::Unsuback(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
142 currentPacket =
new iot::mqtt::client::packets::Pingresp(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
145 currentPacket =
nullptr;
149 return currentPacket;
152 void Mqtt::deliverPacket(iot::mqtt::ControlPacketDeserializer* controlPacketDeserializer) {
153 static_cast<iot::mqtt::client::ControlPacketDeserializer*>(controlPacketDeserializer)->deliverPacket(
this);
173 LOG(INFO) << connectionName <<
" MQTT Client: Acknowledge Flag: " <<
static_cast<
int>(connack.getAcknowledgeFlags());
174 LOG(INFO) << connectionName <<
" MQTT Client: Return code: " <<
static_cast<
int>(connack.getReturnCode());
175 LOG(INFO) << connectionName <<
" MQTT Client: Session present: " << connack.getSessionPresent();
178 LOG(ERROR) << connectionName <<
" MQTT Client: Negative ack received";
180 initSession(&session, keepAlive);
182 pingTimer = core::timer::Timer::intervalTimer(
193 if (Super::_onPublish(publish)) {
199 if (suback.getPacketIdentifier() == 0) {
200 LOG(ERROR) << connectionName <<
" MQTT Client: PackageIdentifier missing";
201 mqttContext->end(
true);
203 LOG(DEBUG) << connectionName <<
" MQTT Client: PacketIdentifier: 0x" << std::hex << std::setfill(
'0') << std::setw(4)
204 << suback.getPacketIdentifier();
206 std::stringstream ss;
207 std::list<uint8_t>::size_type i = 0;
209 for (
const uint8_t returnCode : suback.getReturnCodes()) {
210 if (i != 0 && i % 8 == 0 && i != suback.getReturnCodes().size()) {
215 ss <<
"0x" << std::hex << std::setfill(
'0') << std::setw(2) <<
static_cast<uint16_t>(returnCode) <<
" ";
218 LOG(DEBUG) << connectionName <<
" MQTT Client: Return codes: " << ss.str();
225 if (unsuback.getPacketIdentifier() == 0) {
226 LOG(ERROR) << connectionName <<
" MQTT Client: PacketIdentifier missing";
227 mqttContext->end(
true);
229 LOG(DEBUG) << connectionName <<
" MQTT Client: PacketIdentifier: 0x" << std::hex << std::setfill(
'0') << std::setw(4)
230 << unsuback.getPacketIdentifier();
241 const std::string& clientId,
243 const std::string& willTopic,
244 const std::string& willMessage,
247 const std::string& username,
248 const std::string& password,
249 bool loopPrevention) {
250 this->clientId = clientId;
252 LOG(INFO) << connectionName <<
" MQTT Client: CONNECT send: " << clientId;
254 send(iot::mqtt::packets::Connect(
255 clientId, keepAlive, cleanSession, willTopic, willMessage, willQoS, willRetain, username, password, loopPrevention));
261 send(iot::mqtt::packets::Subscribe(getPacketIdentifier(), topics));
265 send(iot::mqtt::packets::Unsubscribe(getPacketIdentifier(), topics));
269 send(iot::mqtt::
packets::Pingreq());
273 send(iot::mqtt::
packets::Disconnect());
void _onPublish(const iot::mqtt::client::packets::Publish &publish)
void sendConnect(uint16_t keepAlive, const std::string &clientId, bool cleanSession, const std::string &willTopic, const std::string &willMessage, uint8_t willQoS, bool willRetain, const std::string &username, const std::string &password, bool loopPrevention=false)
void _onPingresp(const iot::mqtt::client::packets::Pingresp &pingresp)
virtual void onConnack(const iot::mqtt::packets::Connack &connack)
virtual void onPingresp(const iot::mqtt::packets::Pingresp &pingresp)
void sendSubscribe(const std::list< Topic > &topics)
iot::mqtt::ControlPacketDeserializer * createControlPacketDeserializer(iot::mqtt::FixedHeader &fixedHeader) final
void _onSuback(const iot::mqtt::client::packets::Suback &suback)
virtual void onUnsuback(const iot::mqtt::packets::Unsuback &unsuback)
void _onConnack(const iot::mqtt::client::packets::Connack &connack)
void _onUnsuback(const iot::mqtt::client::packets::Unsuback &unsuback)
void sendUnsubscribe(const std::list< std::string > &topics)
bool onSignal(int sig) override
virtual void onSuback(const iot::mqtt::packets::Suback &suback)
void sendDisconnect() const
Mqtt(const std::string &connectionName, const std::string &clientId)
#define MQTT_CONNACK_ACCEPT