2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
45#include "core/socket/stream/SocketConnection.h"
46#include "iot/mqtt/ControlPacketDeserializer.h"
47#include "iot/mqtt/Session.h"
48#include "iot/mqtt/packets/Puback.h"
49#include "iot/mqtt/packets/Pubcomp.h"
50#include "iot/mqtt/packets/Publish.h"
51#include "iot/mqtt/packets/Pubrec.h"
52#include "iot/mqtt/packets/Pubrel.h"
54#ifndef DOXYGEN_SHOULD_SKIP_THIS
56#include "log/Logger.h"
57#include "utils/hexdump.h"
73 Mqtt::
Mqtt(
const std::string& connectionName,
const std::string& clientId)
96 LOG(INFO) <<
"MQTT: Connected";
100 std::size_t consumed = 0;
120 LOG(DEBUG) <<
connectionName <<
" MQTT: Received packet-type is unavailable ... closing connection";
126 LOG(DEBUG) <<
connectionName <<
" MQTT: Fixed header has error ... closing connection";
142 LOG(DEBUG) <<
connectionName <<
" MQTT: Control packet has error ... closing connection";
177 for (
const auto& [packetIdentifier, publish] : session
->publishMap) {
192 LOG(INFO) <<
connectionName <<
" MQTT: Keep alive initialized with: " << keepAlive;
195 [
this, keepAlive]() {
196 LOG(ERROR) <<
connectionName <<
" MQTT: Keep-alive timer expired. Interval was: " << keepAlive;
211 void Mqtt::
send(
const std::vector<
char>& data)
const {
217 void Mqtt::
sendPublish(
const std::string& topic,
const std::string& message, uint8_t qoS,
226 LOG(DEBUG) <<
connectionName <<
" MQTT: QoS: " <<
static_cast<uint16_t>(qoS);
283 LOG(ERROR) <<
connectionName <<
" MQTT: Received QoS > 0 but no PackageIdentifier present";
310 LOG(ERROR) <<
connectionName <<
" MQTT: PackageIdentifier missing";
313 LOG(DEBUG) <<
connectionName <<
" MQTT: PacketIdentifier: 0x" << std::hex << std::setfill(
'0') << std::setw(4)
322 LOG(ERROR) <<
connectionName <<
" MQTT: PackageIdentifier missing";
325 LOG(DEBUG) <<
connectionName <<
" MQTT: PacketIdentifier: 0x" << std::hex << std::setfill(
'0') << std::setw(4)
339 LOG(ERROR) <<
connectionName <<
" MQTT: PackageIdentifier missing";
342 LOG(DEBUG) <<
connectionName <<
" MQTT: PacketIdentifier: 0x" << std::hex << std::setfill(
'0') << std::setw(4)
355 LOG(ERROR) <<
connectionName <<
" MQTT: PackageIdentifier missing";
358 LOG(DEBUG) <<
connectionName <<
" MQTT: PacketIdentifier: 0x" << std::hex << std::setfill(
'0') << std::setw(4)
372 if (!hexString.empty()) {
373 LOG(TRACE) <<
connectionName <<
" MQTT: Received data (variable header and payload):\n" << hexString;
378 LOG(INFO) <<
connectionName <<
" MQTT: ======================================================";
382 LOG(DEBUG) <<
connectionName <<
" MQTT: Fixed Header: PacketType: 0x" << std::hex << std::setfill(
'0') << std::setw(2)
384 LOG(DEBUG) <<
connectionName <<
" MQTT: PacketFlags: 0x" << std::hex << std::setfill(
'0') << std::setw(2)
385 <<
static_cast<uint16_t>(fixedHeader
.getFlags()) << std::dec;
391 return !hexDump.empty() ? std::string(32,
' ').append(hexDump) :
"";
395 return toHexString(std::vector<
char>(data.begin(), data.end())
);
virtual void setTimeout(const utils::Timeval &timeout)=0
Timer & operator=(Timer &&timer) noexcept=default
static Timer singleshotTimer(const std::function< void()> &dispatcher, const utils::Timeval &timeout)
std::size_t deserialize(iot::mqtt::MqttContext *mqttContext)
std::vector< char > serialize() const
const std::string & getName() const
virtual std::vector< char > serializeVP() const =0
virtual void send(const char *chunk, std::size_t chunklen)=0
virtual core::socket::stream::SocketConnection * getSocketConnection() const =0
virtual void end(bool fatal=false)=0
MqttContext * mqttContext
static std::string toHexString(const std::string &data)
const std::string & getConnectionName() const
virtual void onConnected()
static std::string toHexString(const std::vector< char > &data)
virtual void onPubcomp(const iot::mqtt::packets::Pubcomp &pubcomp)
virtual void onPublish(const iot::mqtt::packets::Publish &publish)
core::timer::Timer keepAliveTimer
iot::mqtt::FixedHeader fixedHeader
const MqttContext * getMqttContext() const
std::size_t onReceivedFromPeer()
virtual void onDisconnected()
void send(const std::vector< char > &data) const
void setMqttContext(MqttContext *mqttContext)
void sendPuback(uint16_t packetIdentifier) const
virtual void onPuback(const iot::mqtt::packets::Puback &puback)
std::string connectionName
void _onPubrec(const iot::mqtt::packets::Pubrec &pubrec)
virtual void deliverPacket(iot::mqtt::ControlPacketDeserializer *controlPacketDeserializer)=0
Mqtt(const std::string &connectionName, const std::string &clientId)
void sendPubrel(uint16_t packetIdentifier) const
uint16_t getPacketIdentifier()
bool _onPublish(const iot::mqtt::packets::Publish &publish)
virtual iot::mqtt::ControlPacketDeserializer * createControlPacketDeserializer(iot::mqtt::FixedHeader &staticHeader)=0
void sendPublish(const std::string &topic, const std::string &message, uint8_t qoS, bool retain)
void printFixedHeader(const iot::mqtt::FixedHeader &fixedHeader) const
uint16_t _packetIdentifier
void _onPubrel(const iot::mqtt::packets::Pubrel &pubrel)
virtual void onPubrec(const iot::mqtt::packets::Pubrec &pubrec)
virtual void onPubrel(const iot::mqtt::packets::Pubrel &pubrel)
void send(const iot::mqtt::ControlPacket &controlPacket) const
Mqtt(const std::string &connectionName)
void initSession(Session *session, utils::Timeval keepAlive)
void printVP(const iot::mqtt::ControlPacket &packet) const
iot::mqtt::ControlPacketDeserializer * controlPacketDeserializer
void sendPubcomp(uint16_t packetIdentifier) const
void sendPubrec(uint16_t packetIdentifier) const
void _onPubcomp(const iot::mqtt::packets::Pubcomp &pubcomp)
void _onPuback(const iot::mqtt::packets::Puback &puback)
std::set< uint16_t > publishPacketIdentifierSet
std::map< uint16_t, iot::mqtt::packets::Publish > publishMap
std::set< uint16_t > pubrelPacketIdentifierSet
uint16_t getPacketIdentifier() const
Puback(uint16_t packetIdentifier)
uint16_t getPacketIdentifier() const
Pubcomp(uint16_t packetIdentifier)
std::string getTopic() const
std::string getMessage() const
Publish(uint16_t packetIdentifier, const std::string &topic, const std::string &message, uint8_t qoS, bool dup, bool retain)
uint16_t getPacketIdentifier() const
Pubrec(uint16_t packetIdentifier)
uint16_t getPacketIdentifier() const
Pubrel(uint16_t packetIdentifier)
uint16_t getPacketIdentifier() const
bool operator>(const Timeval &timeVal) const
Timeval & operator*=(double mul)
const std::vector< std::string > mqttPackageName
std::string hexDump(const std::vector< char > &bytes, int prefixLength, bool prefixAtFirstLine)