SNode.C
Loading...
Searching...
No Matches
Session.cpp
Go to the documentation of this file.
1/*
2 * SNode.C - a slim toolkit for network communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "iot/mqtt/server/broker/Session.h"
21
22#include "iot/mqtt/server/Mqtt.h"
23
24#ifndef DOXYGEN_SHOULD_SKIP_THIS
25
26#include "log/Logger.h"
27
28#include <algorithm>
29#include <iterator>
30#include <map>
31#include <nlohmann/json.hpp>
32#include <string>
33
34// IWYU pragma: no_include <nlohmann/detail/iterators/iter_impl.hpp>
35
36#endif // DOXYGEN_SHOULD_SKIP_THIS
37
38namespace iot::mqtt::server::broker {
39
40 Session::Session(iot::mqtt::server::Mqtt* mqtt)
41 : mqtt(mqtt) {
42 }
43
44 void Session::sendPublish(Message& message, uint8_t qoS, bool retain) {
45 LOG(INFO) << "MQTT Broker: TopicName: " << message.getTopic();
46 LOG(INFO) << "MQTT Broker: Message:\n" << iot::mqtt::Mqtt::toHexString(message.getMessage());
47 LOG(DEBUG) << "MQTT Broker: QoS: " << static_cast<uint16_t>(std::min(qoS, message.getQoS()));
48
49 if (isActive()) {
50 LOG(DEBUG) << "MQTT Broker: ClientId: " << mqtt->getClientId();
51 LOG(DEBUG) << "MQTT Broker: OriginClientId: " << message.getOriginClientId();
52
53 if ((mqtt->getReflect() || mqtt->getClientId() != message.getOriginClientId())) {
54 mqtt->sendPublish(message.getTopic(),
55 message.getMessage(),
56 std::min(message.getQoS(), qoS),
57 !mqtt->getReflect() ? message.getOriginRetain() || retain : retain);
58 } else {
59 LOG(INFO) << "MQTT Broker: Suppress reflection to origin to avoid message looping";
60 }
61 } else {
62 if (message.getQoS() == 0) {
63 messageQueue.clear();
64 }
65
66 message.setQoS(std::min(message.getQoS(), qoS));
67 messageQueue.emplace_back(message);
68 }
69 }
70
72 LOG(INFO) << "MQTT Broker: send queued messages ...";
73 for (iot::mqtt::server::broker::Message& message : messageQueue) {
74 sendPublish(message, message.getQoS(), false);
75 }
76 LOG(INFO) << "MQTT Broker: ... done";
77
78 messageQueue.clear();
79 }
80
81 Session* Session::renew(iot::mqtt::server::Mqtt* mqtt) {
82 this->mqtt = mqtt;
83
84 return this;
85 }
86
87 void Session::retain() {
88 this->mqtt = nullptr;
89 }
90
91 bool Session::isActive() const {
92 return mqtt != nullptr;
93 }
94
95 bool Session::isOwnedBy(const iot::mqtt::server::Mqtt* mqtt) const {
96 return this->mqtt == mqtt;
97 }
98
100 nlohmann::json json = iot::mqtt::Session::toJson();
101
102 std::transform(messageQueue.begin(), messageQueue.end(), std::back_inserter(json["message_queue"]), [](const Message& message) {
103 return message.toJson();
104 });
105
106 return json;
107 }
108
109 void Session::fromJson(const nlohmann::json& json) {
110 std::transform(json["message_queue"].begin(),
111 json["message_queue"].end(),
112 std::back_inserter(messageQueue),
113 [](const nlohmann::json& jsonMessage) {
114 return Message().fromJson(jsonMessage);
115 });
116
117 iot::mqtt::Session::fromJson(json);
118 }
119
120} // namespace iot::mqtt::server::broker
bool getReflect() const
Definition Mqtt.cpp:402
void sendPublish(iot::mqtt::server::broker::Message &message, uint8_t qoS, bool retain)
Definition Session.cpp:44
nlohmann::json toJson() const final
Definition Session.cpp:99
Session(iot::mqtt::server::Mqtt *mqtt)
Definition Session.cpp:40
void fromJson(const nlohmann::json &json)
Definition Session.cpp:109
Session * renew(iot::mqtt::server::Mqtt *mqtt)
Definition Session.cpp:81
bool isOwnedBy(const iot::mqtt::server::Mqtt *mqtt) const
Definition Session.cpp:95