84 if (!fixedHeader.isComplete()) {
87 if (fixedHeader.isError()) {
91 printFixedHeader(fixedHeader);
93 controlPacketDeserializer = createControlPacketDeserializer(fixedHeader);
97 if (controlPacketDeserializer ==
nullptr) {
98 LOG(DEBUG) << connectionName <<
" MQTT: Received packet-type is unavailable ... closing connection";
100 mqttContext->end(
true);
103 if (controlPacketDeserializer->isError()) {
104 LOG(DEBUG) << connectionName <<
" MQTT: Fixed header has error ... closing connection";
106 delete controlPacketDeserializer;
107 controlPacketDeserializer =
nullptr;
109 mqttContext->end(
true);
117 consumed += controlPacketDeserializer->deserialize(mqttContext);
119 if (controlPacketDeserializer->isError()) {
120 LOG(DEBUG) << connectionName <<
" MQTT: Control packet has error ... closing connection";
121 mqttContext->end(
true);
123 delete controlPacketDeserializer;
124 controlPacketDeserializer =
nullptr;
127 }
else if (controlPacketDeserializer->isComplete()) {
128 deliverPacket(controlPacketDeserializer);
130 delete controlPacketDeserializer;
131 controlPacketDeserializer =
nullptr;
135 keepAliveTimer.restart();
144 void Mqtt::onDisconnected() {
145 LOG(INFO) << connectionName <<
" MQTT: Disconnected";
148 const std::string& Mqtt::getConnectionName()
const {
149 return connectionName;
152 void Mqtt::initSession(Session* session, utils::Timeval keepAlive) {
153 this->session = session;
155 for (
const auto& [packetIdentifier, publish] : session->publishMap) {
156 LOG(INFO) << connectionName <<
" MQTT: PUBLISH Resend";
161 for (
const uint16_t packetIdentifier : session->pubrelPacketIdentifierSet) {
162 LOG(INFO) << connectionName <<
" MQTT: PUBREL Resend";
164 send(iot::mqtt::packets::Pubrel(packetIdentifier));
170 LOG(INFO) << connectionName <<
" MQTT: Keep alive initialized with: " << keepAlive;
172 keepAliveTimer = core::timer::Timer::singleshotTimer(
173 [
this, keepAlive]() {
174 LOG(ERROR) << connectionName <<
" MQTT: Keep-alive timer expired. Interval was: " << keepAlive;
175 mqttContext->close();
180 mqttContext->getSocketConnection()->setTimeout(0);
195 void Mqtt::sendPublish(
const std::string& topic,
const std::string& message, uint8_t qoS,
198 uint16_t packageIdentifier = qoS != 0 ? getPacketIdentifier() : 0;
200 send(iot::mqtt::packets::Publish(packageIdentifier, topic, message, qoS,
false, retain));
202 LOG(INFO) << connectionName <<
" MQTT: Topic: " << topic;
203 LOG(INFO) << connectionName <<
" MQTT: Message:\n" << toHexString(message);
204 LOG(DEBUG) << connectionName <<
" MQTT: QoS: " <<
static_cast<uint16_t>(qoS);
205 LOG(DEBUG) << connectionName <<
" MQTT: PacketIdentifier: " << _packetIdentifier;
206 LOG(DEBUG) << connectionName <<
" MQTT: DUP: " <<
false;
207 LOG(DEBUG) << connectionName <<
" MQTT: Retain: " << retain;
210 session->publishMap.emplace(packageIdentifier,
211 iot::mqtt::packets::Publish(packageIdentifier, topic, message, qoS,
true, retain));
246 bool Mqtt::_onPublish(
const packets::Publish& publish) {
249 LOG(INFO) << connectionName <<
" MQTT: Topic: " << publish.getTopic();
250 LOG(INFO) << connectionName <<
" MQTT: Message:\n" << toHexString(publish.getMessage());
251 LOG(DEBUG) << connectionName <<
" MQTT: QoS: " <<
static_cast<uint16_t>(publish.getQoS());
252 LOG(DEBUG) << connectionName <<
" MQTT: PacketIdentifier: " << publish.getPacketIdentifier();
253 LOG(DEBUG) << connectionName <<
" MQTT: DUP: " << publish.getDup();
254 LOG(DEBUG) << connectionName <<
" MQTT: Retain: " << publish.getRetain();
256 if (publish.getQoS() > 2) {
257 LOG(ERROR) << connectionName <<
" MQTT: Received invalid QoS: " << publish.getQoS();
258 mqttContext->end(
true);
260 }
else if (publish.getPacketIdentifier() == 0 && publish.getQoS() > 0) {
261 LOG(ERROR) << connectionName <<
" MQTT: Received QoS > 0 but no PackageIdentifier present";
262 mqttContext->end(
true);
265 switch (publish.getQoS()) {
267 sendPuback(publish.getPacketIdentifier());
271 sendPubrec(publish.getPacketIdentifier());
273 if (session->publishPacketIdentifierSet.contains(publish.getPacketIdentifier())) {
276 session->publishPacketIdentifierSet.insert(publish.getPacketIdentifier());
286 void Mqtt::_onPuback(
const iot::mqtt::packets::Puback& puback) {
287 if (puback.getPacketIdentifier() == 0) {
288 LOG(ERROR) << connectionName <<
" MQTT: PackageIdentifier missing";
289 mqttContext->end(
true);
291 LOG(DEBUG) << connectionName <<
" MQTT: PacketIdentifier: 0x" << std::hex << std::setfill(
'0') << std::setw(4)
292 << puback.getPacketIdentifier();
298 void Mqtt::_onPubrec(
const iot::mqtt::packets::Pubrec& pubrec) {
299 if (pubrec.getPacketIdentifier() == 0) {
300 LOG(ERROR) << connectionName <<
" MQTT: PackageIdentifier missing";
301 mqttContext->end(
true);
303 LOG(DEBUG) << connectionName <<
" MQTT: PacketIdentifier: 0x" << std::hex << std::setfill(
'0') << std::setw(4)
304 << pubrec.getPacketIdentifier();
306 session->publishMap.erase(pubrec.getPacketIdentifier());
307 session->pubrelPacketIdentifierSet.insert(pubrec.getPacketIdentifier());
309 sendPubrel(pubrec.getPacketIdentifier());
315 void Mqtt::_onPubrel(
const iot::mqtt::packets::Pubrel& pubrel) {
316 if (pubrel.getPacketIdentifier() == 0) {
317 LOG(ERROR) << connectionName <<
" MQTT: PackageIdentifier missing";
318 mqttContext->end(
true);
320 LOG(DEBUG) << connectionName <<
" MQTT: PacketIdentifier: 0x" << std::hex << std::setfill(
'0') << std::setw(4)
321 << pubrel.getPacketIdentifier();
323 session->publishPacketIdentifierSet.erase(pubrel.getPacketIdentifier());
325 sendPubcomp(pubrel.getPacketIdentifier());
331 void Mqtt::_onPubcomp(
const iot::mqtt::packets::Pubcomp& pubcomp) {
332 if (pubcomp.getPacketIdentifier() == 0) {
333 LOG(ERROR) << connectionName <<
" MQTT: PackageIdentifier missing";
334 mqttContext->end(
true);
336 LOG(DEBUG) << connectionName <<
" MQTT: PacketIdentifier: 0x" << std::hex << std::setfill(
'0') << std::setw(4)
337 << pubcomp.getPacketIdentifier();
339 session->publishMap.erase(pubcomp.getPacketIdentifier());
340 session->pubrelPacketIdentifierSet.erase(pubcomp.getPacketIdentifier());
355 void Mqtt::printFixedHeader(
const FixedHeader& fixedHeader)
const {
356 LOG(INFO) << connectionName <<
" MQTT: ======================================================";
358 LOG(TRACE) << connectionName <<
" MQTT: Received data (fixed header):\n" << toHexString(fixedHeader.serialize());
360 LOG(DEBUG) << connectionName <<
" MQTT: Fixed Header: PacketType: 0x" << std::hex << std::setfill(
'0') << std::setw(2)
361 <<
static_cast<uint16_t>(fixedHeader.getType()) <<
" (" << iot::mqtt::mqttPackageName[fixedHeader.getType()] <<
")";
362 LOG(DEBUG) << connectionName <<
" MQTT: PacketFlags: 0x" << std::hex << std::setfill(
'0') << std::setw(2)
363 <<
static_cast<uint16_t>(fixedHeader.getFlags()) << std::dec;
364 LOG(DEBUG) << connectionName <<
" MQTT: RemainingLength: " << fixedHeader.getRemainingLength();