SNode.C
Loading...
Searching...
No Matches
iot::mqtt::server::broker::Broker Class Reference

#include <Broker.h>

Collaboration diagram for iot::mqtt::server::broker::Broker:

Public Member Functions

 Broker (uint8_t maxQoS, const std::string &sessionStoreFileName)
 
 ~Broker ()
 
void publish (const std::string &originClientId, const std::string &topic, const std::string &message, uint8_t qoS, bool retain)
 
SessionnewSession (const std::string &clientId, iot::mqtt::server::Mqtt *mqtt)
 
SessionrenewSession (const std::string &clientId, iot::mqtt::server::Mqtt *mqtt)
 
void restartSession (const std::string &clientId)
 
void retainSession (const std::string &clientId)
 
void deleteSession (const std::string &clientId)
 
bool hasSession (const std::string &clientId)
 
bool hasActiveSession (const std::string &clientId)
 
bool hasRetainedSession (const std::string &clientId)
 
bool isActiveSession (const std::string &clientId, const Mqtt *mqtt)
 
void appear (const std::string &clientId, const std::string &topic, uint8_t qoS)
 
void release (const std::string &topic)
 
uint8_t subscribe (const std::string &clientId, const std::string &topic, uint8_t qoS)
 
void unsubscribe (const std::string &clientId)
 
void unsubscribe (const std::string &clientId, const std::string &topic)
 
std::list< std::string > getSubscriptions (const std::string &clientId) const
 
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree () const
 
std::list< std::pair< std::string, std::string > > getRetainTree ()
 
void sendPublish (const std::string &clientId, Message &message, uint8_t qoS, bool retain)
 

Static Public Member Functions

static std::shared_ptr< Brokerinstance (uint8_t maxQoS, const std::string &sessionStoreFileName)
 

Private Attributes

std::string sessionStoreFileName
 
uint8_t maxQoS
 
iot::mqtt::server::broker::SubscribtionTree subscribtionTree
 
iot::mqtt::server::broker::RetainTree retainTree
 
std::map< std::string, iot::mqtt::server::broker::SessionsessionStore
 

Detailed Description

Definition at line 73 of file Broker.h.

Constructor & Destructor Documentation

◆ Broker()

iot::mqtt::server::broker::Broker::Broker ( uint8_t  maxQoS,
const std::string &  sessionStoreFileName 
)

Definition at line 60 of file Broker.cpp.

62 , maxQoS(maxQoS)
63 , subscribtionTree(this)
64 , retainTree(this) {
65 if (!sessionStoreFileName.empty()) {
66 std::ifstream sessionStoreFile(sessionStoreFileName);
67
68 if (sessionStoreFile.is_open()) {
69 try {
70 nlohmann::json sessionStoreJson;
71
72 sessionStoreFile >> sessionStoreJson;
73
74 for (const auto& [clientId, sessionJson] : sessionStoreJson["session_store"].items()) {
75 sessionStore[clientId].fromJson(sessionJson);
76 }
77 retainTree.fromJson(sessionStoreJson["retain_tree"]);
78 subscribtionTree.fromJson(sessionStoreJson["subscribtion_tree"]);
79
80 LOG(INFO) << "MQTT Broker: Persistent session data loaded successful";
81 } catch (const nlohmann::json::exception&) {
82 LOG(INFO) << "MQTT Broker: Starting with empty session: Session store '" << sessionStoreFileName
83 << "' empty or corrupted";
84
85 sessionStore.clear();
88 }
89
90 sessionStoreFile.close();
91 std::remove(sessionStoreFileName.data()); // NOLINT
92
93 LOG(INFO) << "MQTT Broker: Restoring saved session done";
94 } else {
95 PLOG(WARNING) << "MQTT Broker: Could not read session store '" << sessionStoreFileName << "'";
96 }
97 } else {
98 LOG(INFO) << "MQTT Broker: Session not reloaded: Session store filename empty";
99 }
100 }
iot::mqtt::server::broker::RetainTree retainTree
Definition Broker.h:113
std::map< std::string, iot::mqtt::server::broker::Session > sessionStore
Definition Broker.h:115
iot::mqtt::server::broker::SubscribtionTree subscribtionTree
Definition Broker.h:112
void fromJson(const nlohmann::json &json)
void fromJson(const nlohmann::json &json)

References iot::mqtt::server::broker::RetainTree::clear(), iot::mqtt::server::broker::SubscribtionTree::clear(), iot::mqtt::server::broker::RetainTree::fromJson(), iot::mqtt::server::broker::Session::fromJson(), iot::mqtt::server::broker::SubscribtionTree::fromJson(), maxQoS, retainTree, iot::mqtt::server::broker::RetainTree::RetainTree(), sessionStore, sessionStoreFileName, subscribtionTree, and iot::mqtt::server::broker::SubscribtionTree::SubscribtionTree().

Here is the call graph for this function:

◆ ~Broker()

iot::mqtt::server::broker::Broker::~Broker ( )

Definition at line 102 of file Broker.cpp.

102 {
103 if (!sessionStoreFileName.empty()) {
104 nlohmann::json sessionStoreJson;
105
106 for (auto& [clientId, session] : sessionStore) {
107 sessionStoreJson["session_store"][clientId] = session.toJson();
108 }
109 sessionStoreJson["retain_tree"] = retainTree.toJson();
110 sessionStoreJson["subscribtion_tree"] = subscribtionTree.toJson();
111
112 if (sessionStoreJson["session_store"].empty()) {
113 sessionStoreJson.erase("session_store");
114 }
115 if (sessionStoreJson["retain_tree"].empty()) {
116 sessionStoreJson.erase("retain_tree");
117 }
118 if (sessionStoreJson["subscribtion_tree"].empty()) {
119 sessionStoreJson.erase("subscribtion_tree");
120 }
121
122 std::ofstream sessionStoreFile(sessionStoreFileName);
123
124 if (sessionStoreFile.is_open()) {
125 if (!sessionStoreJson.empty()) {
126 sessionStoreFile << sessionStoreJson;
127 }
128
129 sessionStoreFile.close();
130
131 LOG(INFO) << "MQTT Broker: Session store written '" << sessionStoreFileName << "'";
132 } else {
133 PLOG(ERROR) << "MQTT Broker: Could not write session store '" << sessionStoreFileName << "'";
134 }
135 } else {
136 LOG(INFO) << "MQTT Broker: Session not saved: Session store filename empty";
137 }
138 }

References retainTree, sessionStore, sessionStoreFileName, subscribtionTree, iot::mqtt::server::broker::RetainTree::toJson(), iot::mqtt::server::broker::SubscribtionTree::toJson(), and iot::mqtt::server::broker::Session::toJson().

Here is the call graph for this function:

Member Function Documentation

◆ appear()

void iot::mqtt::server::broker::Broker::appear ( const std::string &  clientId,
const std::string &  topic,
uint8_t  qoS 
)

Definition at line 198 of file Broker.cpp.

198 {
199 retainTree.appear(clientId, topic, qoS);
200 }
void appear(const std::string &clientId, const std::string &topic, uint8_t qoS)

References iot::mqtt::server::broker::RetainTree::appear(), and retainTree.

Referenced by iot::mqtt::server::broker::SubscribtionTree::TopicLevel::appear().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ deleteSession()

void iot::mqtt::server::broker::Broker::deleteSession ( const std::string &  clientId)

Definition at line 177 of file Broker.cpp.

177 {
179 sessionStore.erase(clientId);
180 }
void unsubscribe(const std::string &topic, const std::string &clientId)

References sessionStore, subscribtionTree, and iot::mqtt::server::broker::SubscribtionTree::unsubscribe().

Referenced by iot::mqtt::server::Mqtt::releaseSession().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getRetainTree()

std::list< std::pair< std::string, std::string > > iot::mqtt::server::broker::Broker::getRetainTree ( )

Definition at line 237 of file Broker.cpp.

237 {
238 return retainTree.getRetainTree();
239 }
std::list< std::pair< std::string, std::string > > getRetainTree() const

References iot::mqtt::server::broker::RetainTree::getRetainTree(), and retainTree.

Here is the call graph for this function:

◆ getSubscriptions()

std::list< std::string > iot::mqtt::server::broker::Broker::getSubscriptions ( const std::string &  clientId) const

Definition at line 229 of file Broker.cpp.

229 {
230 return subscribtionTree.getSubscriptions(clientId);
231 }
std::list< std::string > getSubscriptions(const std::string &clientId) const

References iot::mqtt::server::broker::SubscribtionTree::getSubscriptions(), and subscribtionTree.

Referenced by iot::mqtt::server::Mqtt::getSubscriptions().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getSubscriptionTree()

std::map< std::string, std::list< std::pair< std::string, uint8_t > > > iot::mqtt::server::broker::Broker::getSubscriptionTree ( ) const

Definition at line 233 of file Broker.cpp.

233 {
235 }
std::map< std::string, std::list< std::pair< std::string, uint8_t > > > getSubscriptionTree() const

References iot::mqtt::server::broker::SubscribtionTree::getSubscriptionTree(), and subscribtionTree.

Here is the call graph for this function:

◆ hasActiveSession()

bool iot::mqtt::server::broker::Broker::hasActiveSession ( const std::string &  clientId)

Definition at line 186 of file Broker.cpp.

186 {
187 return hasSession(clientId) && sessionStore[clientId].isActive();
188 }
bool hasSession(const std::string &clientId)
Definition Broker.cpp:182

References hasSession(), iot::mqtt::server::broker::Session::isActive(), and sessionStore.

Referenced by iot::mqtt::server::Mqtt::initSession().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ hasRetainedSession()

bool iot::mqtt::server::broker::Broker::hasRetainedSession ( const std::string &  clientId)

Definition at line 190 of file Broker.cpp.

190 {
191 return hasSession(clientId) && !sessionStore[clientId].isActive();
192 }

References hasSession(), iot::mqtt::server::broker::Session::isActive(), and sessionStore.

Referenced by iot::mqtt::server::Mqtt::initSession().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ hasSession()

bool iot::mqtt::server::broker::Broker::hasSession ( const std::string &  clientId)

Definition at line 182 of file Broker.cpp.

182 {
183 return sessionStore.contains(clientId);
184 }

References sessionStore.

Referenced by hasActiveSession(), hasRetainedSession(), and isActiveSession().

Here is the caller graph for this function:

◆ instance()

std::shared_ptr< Broker > iot::mqtt::server::broker::Broker::instance ( uint8_t  maxQoS,
const std::string &  sessionStoreFileName 
)
static

Definition at line 140 of file Broker.cpp.

140 {
141 static const std::shared_ptr<Broker> broker = std::make_shared<Broker>(maxQoS, sessionStoreFileName);
142
143 return broker;
144 }

Referenced by iot::mqtt::server::SharedSocketContextFactory::create().

Here is the caller graph for this function:

◆ isActiveSession()

bool iot::mqtt::server::broker::Broker::isActiveSession ( const std::string &  clientId,
const Mqtt mqtt 
)

Definition at line 194 of file Broker.cpp.

194 {
195 return hasSession(clientId) && sessionStore[clientId].isOwnedBy(mqtt);
196 }

References hasSession(), iot::mqtt::server::broker::Session::isOwnedBy(), and sessionStore.

Referenced by iot::mqtt::server::Mqtt::releaseSession().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ newSession()

Session * iot::mqtt::server::broker::Broker::newSession ( const std::string &  clientId,
iot::mqtt::server::Mqtt mqtt 
)

Definition at line 155 of file Broker.cpp.

155 {
157
158 return &sessionStore[clientId];
159 }

References iot::mqtt::server::broker::Session::Session(), and sessionStore.

Referenced by iot::mqtt::server::Mqtt::initSession().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ publish()

void iot::mqtt::server::broker::Broker::publish ( const std::string &  originClientId,
const std::string &  topic,
const std::string &  message,
uint8_t  qoS,
bool  retain 
)

Definition at line 147 of file Broker.cpp.

147 {
148 subscribtionTree.publish(Message(originClientId, topic, message, qoS, retain));
149
150 if (retain) {
151 retainTree.retain(Message(originClientId, topic, message, qoS, retain));
152 }
153 }

References iot::mqtt::server::broker::Message::Message(), iot::mqtt::server::broker::SubscribtionTree::publish(), iot::mqtt::server::broker::RetainTree::retain(), retainTree, and subscribtionTree.

Referenced by iot::mqtt::server::Mqtt::_onPublish(), and iot::mqtt::server::Mqtt::~Mqtt().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ release()

void iot::mqtt::server::broker::Broker::release ( const std::string &  topic)

Definition at line 202 of file Broker.cpp.

202 {
203 retainTree.release(topic);
204 }
void release(const std::string &topic)

References iot::mqtt::server::broker::RetainTree::release(), and retainTree.

Here is the call graph for this function:

◆ renewSession()

Session * iot::mqtt::server::broker::Broker::renewSession ( const std::string &  clientId,
iot::mqtt::server::Mqtt mqtt 
)

Definition at line 161 of file Broker.cpp.

161 {
162 return sessionStore[clientId].renew(mqtt);
163 }

References iot::mqtt::server::broker::Session::renew(), and sessionStore.

Referenced by iot::mqtt::server::Mqtt::initSession().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ restartSession()

void iot::mqtt::server::broker::Broker::restartSession ( const std::string &  clientId)

Definition at line 165 of file Broker.cpp.

165 {
166 LOG(INFO) << "MQTT Broker: Retained: Send PUBLISH: " << clientId;
167 subscribtionTree.appear(clientId);
168
169 LOG(INFO) << "MQTT Broker: Queued: Send PUBLISH: " << clientId;
170 sessionStore[clientId].publishQueued();
171 }
void appear(const std::string &clientId)

References iot::mqtt::server::broker::SubscribtionTree::appear(), iot::mqtt::server::broker::Session::publishQueued(), sessionStore, and subscribtionTree.

Referenced by iot::mqtt::server::Mqtt::initSession().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ retainSession()

void iot::mqtt::server::broker::Broker::retainSession ( const std::string &  clientId)

Definition at line 173 of file Broker.cpp.

173 {
174 sessionStore[clientId].retain();
175 }

References iot::mqtt::server::broker::Session::retain(), and sessionStore.

Referenced by iot::mqtt::server::Mqtt::releaseSession().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ sendPublish()

void iot::mqtt::server::broker::Broker::sendPublish ( const std::string &  clientId,
Message message,
uint8_t  qoS,
bool  retain 
)

Definition at line 241 of file Broker.cpp.

241 {
242 LOG(INFO) << "MQTT Broker: Send PUBLISH: " << clientId;
243
244 sessionStore[clientId].sendPublish(message, qoS, retain);
245 }

References iot::mqtt::server::broker::Session::sendPublish(), and sessionStore.

Referenced by iot::mqtt::server::broker::RetainTree::TopicLevel::appear(), iot::mqtt::server::broker::RetainTree::TopicLevel::appear(), and iot::mqtt::server::broker::SubscribtionTree::TopicLevel::publish().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ subscribe()

uint8_t iot::mqtt::server::broker::Broker::subscribe ( const std::string &  clientId,
const std::string &  topic,
uint8_t  qoS 
)

Definition at line 206 of file Broker.cpp.

206 {
207 qoS = std::min(maxQoS, qoS);
208 uint8_t returnCode = 0;
209
210 if (subscribtionTree.subscribe(topic, clientId, qoS)) {
211 retainTree.appear(clientId, topic, qoS);
212
213 returnCode = SUBSCRIBTION_SUCCESS | qoS;
214 } else {
215 returnCode = SUBSCRIBTION_FAILURE;
216 }
217
218 return returnCode;
219 }
#define SUBSCRIBTION_FAILURE
Definition Broker.h:69
#define SUBSCRIBTION_SUCCESS
Definition Broker.h:68
bool subscribe(const std::string &topic, const std::string &clientId, uint8_t qoS)

References iot::mqtt::server::broker::RetainTree::appear(), maxQoS, retainTree, iot::mqtt::server::broker::SubscribtionTree::subscribe(), and subscribtionTree.

Referenced by iot::mqtt::server::Mqtt::_onSubscribe(), and iot::mqtt::server::Mqtt::subscribe().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ unsubscribe() [1/2]

void iot::mqtt::server::broker::Broker::unsubscribe ( const std::string &  clientId)

Definition at line 221 of file Broker.cpp.

221 {
223 }

References subscribtionTree, and iot::mqtt::server::broker::SubscribtionTree::unsubscribe().

Referenced by iot::mqtt::server::Mqtt::initSession().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ unsubscribe() [2/2]

void iot::mqtt::server::broker::Broker::unsubscribe ( const std::string &  clientId,
const std::string &  topic 
)

Definition at line 225 of file Broker.cpp.

225 {
226 subscribtionTree.unsubscribe(topic, clientId);
227 }

References subscribtionTree, and iot::mqtt::server::broker::SubscribtionTree::unsubscribe().

Referenced by iot::mqtt::server::Mqtt::_onUnsubscribe(), and iot::mqtt::server::Mqtt::unsubscribe().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ maxQoS

uint8_t iot::mqtt::server::broker::Broker::maxQoS
private

Definition at line 110 of file Broker.h.

Referenced by Broker(), and subscribe().

◆ retainTree

iot::mqtt::server::broker::RetainTree iot::mqtt::server::broker::Broker::retainTree
private

Definition at line 113 of file Broker.h.

Referenced by appear(), Broker(), getRetainTree(), publish(), release(), subscribe(), and ~Broker().

◆ sessionStore

std::map<std::string, iot::mqtt::server::broker::Session> iot::mqtt::server::broker::Broker::sessionStore
private

◆ sessionStoreFileName

std::string iot::mqtt::server::broker::Broker::sessionStoreFileName
private

Definition at line 109 of file Broker.h.

Referenced by Broker(), and ~Broker().

◆ subscribtionTree

iot::mqtt::server::broker::SubscribtionTree iot::mqtt::server::broker::Broker::subscribtionTree
private

The documentation for this class was generated from the following files: