SNode.C
Loading...
Searching...
No Matches
iot::mqtt::server::broker::Broker Class Reference

#include <Broker.h>

Collaboration diagram for iot::mqtt::server::broker::Broker:

Public Member Functions

 Broker (uint8_t maxQoS)
 
 ~Broker ()
 
void appear (const std::string &clientId, const std::string &topic, uint8_t qoS)
 
void unsubscribe (const std::string &clientId)
 
void publish (const std::string &originClientId, const std::string &topic, const std::string &message, uint8_t qoS, bool retain)
 
uint8_t subscribe (const std::string &clientId, const std::string &topic, uint8_t qoS)
 
void unsubscribe (const std::string &clientId, const std::string &topic)
 
bool hasSession (const std::string &clientId)
 
bool hasActiveSession (const std::string &clientId)
 
bool hasRetainedSession (const std::string &clientId)
 
bool isActiveSession (const std::string &clientId, const Mqtt *mqtt)
 
SessionnewSession (const std::string &clientId, iot::mqtt::server::Mqtt *mqtt)
 
SessionrenewSession (const std::string &clientId, iot::mqtt::server::Mqtt *mqtt)
 
void restartSession (const std::string &clientId)
 
void retainSession (const std::string &clientId)
 
void deleteSession (const std::string &clientId)
 
void sendPublish (const std::string &clientId, Message &message, uint8_t qoS, bool retain)
 

Static Public Member Functions

static std::shared_ptr< Brokerinstance (uint8_t maxQoS)
 

Private Attributes

std::string sessionStoreFileName
 
uint8_t maxQoS
 
iot::mqtt::server::broker::SubscribtionTree subscribtionTree
 
iot::mqtt::server::broker::RetainTree retainTree
 
std::map< std::string, iot::mqtt::server::broker::SessionsessionStore
 

Detailed Description

Definition at line 47 of file Broker.h.

Constructor & Destructor Documentation

◆ Broker()

iot::mqtt::server::broker::Broker::Broker ( uint8_t maxQoS)
explicit

Definition at line 39 of file Broker.cpp.

40 : sessionStoreFileName((getenv("MQTT_SESSION_STORE") != nullptr) ? getenv("MQTT_SESSION_STORE") : "") // NOLINT
41 , maxQoS(maxQoS)
42 , subscribtionTree(this)
43 , retainTree(this) {
44 if (!sessionStoreFileName.empty()) {
45 std::ifstream sessionStoreFile(sessionStoreFileName);
46
47 if (sessionStoreFile.is_open()) {
48 try {
49 nlohmann::json sessionStoreJson;
50
51 sessionStoreFile >> sessionStoreJson;
52
53 for (const auto& [clientId, sessionJson] : sessionStoreJson["session_store"].items()) {
54 sessionStore[clientId].fromJson(sessionJson);
55 }
56 retainTree.fromJson(sessionStoreJson["retain_tree"]);
57 subscribtionTree.fromJson(sessionStoreJson["subscribtion_tree"]);
58
59 LOG(INFO) << "MQTT Broker: Persistent session data loaded successful";
60 } catch (const nlohmann::json::exception&) {
61 LOG(INFO) << "MQTT Broker: Starting with empty session: Session store '" << sessionStoreFileName
62 << "' empty or corrupted";
63
64 sessionStore.clear();
67 }
68
69 sessionStoreFile.close();
70 std::remove(sessionStoreFileName.data()); // NOLINT
71
72 LOG(INFO) << "MQTT Broker: Restoring saved session done";
73 } else {
74 PLOG(WARNING) << "MQTT Broker: Could not read session store '" << sessionStoreFileName << "'";
75 }
76 } else {
77 LOG(INFO) << "MQTT Broker: Session not reloaded: Session store filename empty";
78 }
79 }
iot::mqtt::server::broker::RetainTree retainTree
Definition Broker.h:81
std::map< std::string, iot::mqtt::server::broker::Session > sessionStore
Definition Broker.h:83
iot::mqtt::server::broker::SubscribtionTree subscribtionTree
Definition Broker.h:80
void fromJson(const nlohmann::json &json)
void fromJson(const nlohmann::json &json)

References Broker(), and maxQoS.

Referenced by Broker().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ ~Broker()

iot::mqtt::server::broker::Broker::~Broker ( )

Definition at line 81 of file Broker.cpp.

81 {
82 if (!sessionStoreFileName.empty()) {
83 nlohmann::json sessionStoreJson;
84
85 for (auto& [clientId, session] : sessionStore) {
86 sessionStoreJson["session_store"][clientId] = session.toJson();
87 }
88 sessionStoreJson["retain_tree"] = retainTree.toJson();
89 sessionStoreJson["subscribtion_tree"] = subscribtionTree.toJson();
90
91 if (sessionStoreJson["session_store"].empty()) {
92 sessionStoreJson.erase("session_store");
93 }
94 if (sessionStoreJson["retain_tree"].empty()) {
95 sessionStoreJson.erase("retain_tree");
96 }
97 if (sessionStoreJson["subscribtion_tree"].empty()) {
98 sessionStoreJson.erase("subscribtion_tree");
99 }
100
101 std::ofstream sessionStoreFile(sessionStoreFileName);
102
103 if (sessionStoreFile.is_open()) {
104 if (!sessionStoreJson.empty()) {
105 sessionStoreFile << sessionStoreJson;
106 }
107
108 sessionStoreFile.close();
109
110 LOG(INFO) << "MQTT Broker: Session store written '" << sessionStoreFileName << "'";
111 } else {
112 PLOG(ERROR) << "MQTT Broker: Could not write session store '" << sessionStoreFileName << "'";
113 }
114 } else {
115 LOG(INFO) << "MQTT Broker: Session not saved: Session store filename empty";
116 }
117 }

Member Function Documentation

◆ appear()

void iot::mqtt::server::broker::Broker::appear ( const std::string & clientId,
const std::string & topic,
uint8_t qoS )

Definition at line 125 of file Broker.cpp.

125 {
126 retainTree.appear(clientId, topic, qoS);
127 }
void appear(const std::string &clientId, const std::string &topic, uint8_t qoS)

◆ deleteSession()

void iot::mqtt::server::broker::Broker::deleteSession ( const std::string & clientId)

Definition at line 199 of file Broker.cpp.

199 {
201 sessionStore.erase(clientId);
202 }
void unsubscribe(const std::string &topic, const std::string &clientId)

◆ hasActiveSession()

bool iot::mqtt::server::broker::Broker::hasActiveSession ( const std::string & clientId)

Definition at line 165 of file Broker.cpp.

165 {
166 return hasSession(clientId) && sessionStore[clientId].isActive();
167 }
bool hasSession(const std::string &clientId)
Definition Broker.cpp:161

◆ hasRetainedSession()

bool iot::mqtt::server::broker::Broker::hasRetainedSession ( const std::string & clientId)

Definition at line 169 of file Broker.cpp.

169 {
170 return hasSession(clientId) && !sessionStore[clientId].isActive();
171 }

◆ hasSession()

bool iot::mqtt::server::broker::Broker::hasSession ( const std::string & clientId)

Definition at line 161 of file Broker.cpp.

161 {
162 return sessionStore.contains(clientId);
163 }

◆ instance()

std::shared_ptr< Broker > iot::mqtt::server::broker::Broker::instance ( uint8_t maxQoS)
static

Definition at line 119 of file Broker.cpp.

119 {
120 static const std::shared_ptr<Broker> broker = std::make_shared<Broker>(maxQoS);
121
122 return broker;
123 }

◆ isActiveSession()

bool iot::mqtt::server::broker::Broker::isActiveSession ( const std::string & clientId,
const Mqtt * mqtt )

Definition at line 173 of file Broker.cpp.

173 {
174 return hasSession(clientId) && sessionStore[clientId].isOwnedBy(mqtt);
175 }

◆ newSession()

Session * iot::mqtt::server::broker::Broker::newSession ( const std::string & clientId,
iot::mqtt::server::Mqtt * mqtt )

Definition at line 177 of file Broker.cpp.

177 {
179
180 return &sessionStore[clientId];
181 }

◆ publish()

void iot::mqtt::server::broker::Broker::publish ( const std::string & originClientId,
const std::string & topic,
const std::string & message,
uint8_t qoS,
bool retain )

Definition at line 134 of file Broker.cpp.

134 {
135 subscribtionTree.publish(Message(originClientId, topic, message, qoS, retain));
136
137 if (retain) {
138 retainTree.retain(Message(originClientId, topic, message, qoS, retain));
139 }
140 }

◆ renewSession()

Session * iot::mqtt::server::broker::Broker::renewSession ( const std::string & clientId,
iot::mqtt::server::Mqtt * mqtt )

Definition at line 183 of file Broker.cpp.

183 {
184 return sessionStore[clientId].renew(mqtt);
185 }

◆ restartSession()

void iot::mqtt::server::broker::Broker::restartSession ( const std::string & clientId)

Definition at line 187 of file Broker.cpp.

187 {
188 LOG(INFO) << "MQTT Broker: Retained: Send PUBLISH: " << clientId;
189 subscribtionTree.appear(clientId);
190
191 LOG(INFO) << "MQTT Broker: Queued: Send PUBLISH: " << clientId;
192 sessionStore[clientId].publishQueued();
193 }
void appear(const std::string &clientId)

◆ retainSession()

void iot::mqtt::server::broker::Broker::retainSession ( const std::string & clientId)

Definition at line 195 of file Broker.cpp.

195 {
196 sessionStore[clientId].retain();
197 }

◆ sendPublish()

void iot::mqtt::server::broker::Broker::sendPublish ( const std::string & clientId,
Message & message,
uint8_t qoS,
bool retain )

Definition at line 204 of file Broker.cpp.

204 {
205 LOG(INFO) << "MQTT Broker: Send PUBLISH: " << clientId;
206
207 sessionStore[clientId].sendPublish(message, qoS, retain);
208 }

◆ subscribe()

uint8_t iot::mqtt::server::broker::Broker::subscribe ( const std::string & clientId,
const std::string & topic,
uint8_t qoS )

Definition at line 142 of file Broker.cpp.

142 {
143 qoS = std::min(maxQoS, qoS);
144 uint8_t returnCode = 0;
145
146 if (subscribtionTree.subscribe(topic, clientId, qoS)) {
147 retainTree.appear(clientId, topic, qoS);
148
149 returnCode = SUBSCRIBTION_SUCCESS | qoS;
150 } else {
151 returnCode = SUBSCRIBTION_FAILURE;
152 }
153
154 return returnCode;
155 }
#define SUBSCRIBTION_FAILURE
Definition Broker.h:43
#define SUBSCRIBTION_SUCCESS
Definition Broker.h:42
bool subscribe(const std::string &topic, const std::string &clientId, uint8_t qoS)

References maxQoS.

◆ unsubscribe() [1/2]

void iot::mqtt::server::broker::Broker::unsubscribe ( const std::string & clientId)

Definition at line 129 of file Broker.cpp.

129 {
131 }

◆ unsubscribe() [2/2]

void iot::mqtt::server::broker::Broker::unsubscribe ( const std::string & clientId,
const std::string & topic )

Definition at line 157 of file Broker.cpp.

157 {
158 subscribtionTree.unsubscribe(topic, clientId);
159 }

Member Data Documentation

◆ maxQoS

uint8_t iot::mqtt::server::broker::Broker::maxQoS
private

Definition at line 78 of file Broker.h.

Referenced by Broker(), and subscribe().

◆ retainTree

iot::mqtt::server::broker::RetainTree iot::mqtt::server::broker::Broker::retainTree
private

Definition at line 81 of file Broker.h.

◆ sessionStore

std::map<std::string, iot::mqtt::server::broker::Session> iot::mqtt::server::broker::Broker::sessionStore
private

Definition at line 83 of file Broker.h.

◆ sessionStoreFileName

std::string iot::mqtt::server::broker::Broker::sessionStoreFileName
private

Definition at line 77 of file Broker.h.

◆ subscribtionTree

iot::mqtt::server::broker::SubscribtionTree iot::mqtt::server::broker::Broker::subscribtionTree
private

Definition at line 80 of file Broker.h.


The documentation for this class was generated from the following files: