MQTTSuite
Loading...
Searching...
No Matches
mqtt::mqttintegrator::lib::Mqtt Class Reference

#include <Mqtt.h>

Inheritance diagram for mqtt::mqttintegrator::lib::Mqtt:
Collaboration diagram for mqtt::mqttintegrator::lib::Mqtt:

Classes

class  DelayedQueue
struct  ScheduledPublish

Public Member Functions

 Mqtt (const std::string &connectionName, std::shared_ptr< mqtt::lib::MqttMapper > mqttMapper, const std::string &sessionStoreFileName)
 ~Mqtt () override

Static Public Member Functions

static mqtt::lib::admin::ReloadResult updateSubscriptions (bool mustReconnect)

Private Types

using Super = iot::mqtt::client::Mqtt

Private Member Functions

void onConnected () final
bool onSignal (int signum) final
void onConnack (const iot::mqtt::packets::Connack &connack) final
void onPublish (const iot::mqtt::packets::Publish &publish) final
std::pair< std::size_t, std::size_t > resubscribe ()

Private Attributes

std::shared_ptr< mqtt::lib::MqttMappermqttMapper
std::list< iot::mqtt::Topic > currentSubscriptions
class mqtt::mqttintegrator::lib::Mqtt::DelayedQueue delayedQueue

Static Private Attributes

static std::set< Mqtt * > mqttInstances

Detailed Description

Definition at line 78 of file Mqtt.h.

Member Typedef Documentation

◆ Super

using mqtt::mqttintegrator::lib::Mqtt::Super = iot::mqtt::client::Mqtt
private

Definition at line 88 of file Mqtt.h.

Constructor & Destructor Documentation

◆ Mqtt()

mqtt::mqttintegrator::lib::Mqtt::Mqtt ( const std::string & connectionName,
std::shared_ptr< mqtt::lib::MqttMapper > mqttMapper,
const std::string & sessionStoreFileName )
explicit

Definition at line 72 of file Mqtt.cpp.

75 : iot::mqtt::client::Mqtt(connectionName, //
76 mqttMapper->getConnection()["client_id"],
77 mqttMapper->getConnection()["keep_alive"],
78 sessionStoreFileName)
80 , currentSubscriptions(mqttMapper->extractSubscriptions())
81 , delayedQueue(this) {
82 mqttInstances.insert(this);
83 }
static std::set< Mqtt * > mqttInstances
Definition Mqtt.h:129
class mqtt::mqttintegrator::lib::Mqtt::DelayedQueue delayedQueue
std::shared_ptr< mqtt::lib::MqttMapper > mqttMapper
Definition Mqtt.h:100
std::list< iot::mqtt::Topic > currentSubscriptions
Definition Mqtt.h:101

References currentSubscriptions, mqtt::mqttintegrator::lib::Mqtt::DelayedQueue::DelayedQueue(), mqtt::lib::MqttMapper::extractSubscriptions(), mqtt::lib::MqttMapper::getConnection(), mqttInstances, and mqttMapper.

Here is the call graph for this function:

◆ ~Mqtt()

mqtt::mqttintegrator::lib::Mqtt::~Mqtt ( )
override

Definition at line 85 of file Mqtt.cpp.

85 {
86 mqttInstances.erase(this);
87 }

References mqttInstances.

Member Function Documentation

◆ onConnack()

void mqtt::mqttintegrator::lib::Mqtt::onConnack ( const iot::mqtt::packets::Connack & connack)
finalprivate

Definition at line 130 of file Mqtt.cpp.

130 {
131 if (connack.getReturnCode() == 0 && !connack.getSessionPresent()) {
132 sendSubscribe(currentSubscriptions);
133 }
134 }

References currentSubscriptions.

◆ onConnected()

void mqtt::mqttintegrator::lib::Mqtt::onConnected ( )
finalprivate

Definition at line 113 of file Mqtt.cpp.

113 {
114 const nlohmann::json& connection = mqttMapper->getConnection();
115
116 sendConnect(connection["clean_session"],
117 connection["will_topic"],
118 connection["will_message"],
119 connection["will_qos"],
120 connection["will_retain"],
121 connection["username"],
122 connection["password"]);
123 }

References mqtt::lib::MqttMapper::getConnection(), and mqttMapper.

Here is the call graph for this function:

◆ onPublish()

void mqtt::mqttintegrator::lib::Mqtt::onPublish ( const iot::mqtt::packets::Publish & publish)
finalprivate

Definition at line 136 of file Mqtt.cpp.

136 {
137 auto [immediatePublishes, scheduledPublishes] = mqttMapper->getMappings(publish);
138
139 for (const mqtt::lib::MqttMapper::ScheduledPublish& delayedPublish : scheduledPublishes) {
140 delayedQueue.delayPublish(delayedPublish.delay, delayedPublish.publish);
141 }
142
143 for (const iot::mqtt::packets::Publish& mappedPublish : immediatePublishes) {
144 sendPublish(mappedPublish.getTopic(), mappedPublish.getMessage(), mappedPublish.getQoS(), mappedPublish.getRetain());
145
146 onPublish(mappedPublish);
147 }
148 }
void onPublish(const iot::mqtt::packets::Publish &publish) final
Definition Mqtt.cpp:136

References mqtt::lib::MqttMapper::ScheduledPublish::delay, mqtt::mqttintegrator::lib::Mqtt::DelayedQueue::delayPublish(), mqtt::lib::MqttMapper::getMappings(), mqttMapper, onPublish(), and mqtt::lib::MqttMapper::ScheduledPublish::publish.

Referenced by onPublish(), and mqtt::mqttintegrator::lib::Mqtt::DelayedQueue::processDue().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ onSignal()

bool mqtt::mqttintegrator::lib::Mqtt::onSignal ( int signum)
nodiscardfinalprivate

Definition at line 125 of file Mqtt.cpp.

125 {
126 sendDisconnect();
127 return Super::onSignal(signum);
128 }

◆ resubscribe()

std::pair< std::size_t, std::size_t > mqtt::mqttintegrator::lib::Mqtt::resubscribe ( )
private

Definition at line 150 of file Mqtt.cpp.

150 {
151 std::list<iot::mqtt::Topic> newSubscriptions = mqttMapper->extractSubscriptions();
152
153 std::list<std::string> topicsToUnsubscribe;
154 for (const auto& currentTopic : currentSubscriptions) {
155 const bool existsInNew = std::any_of(newSubscriptions.begin(), newSubscriptions.end(), [&](const auto& newTopic) {
156 return currentTopic.getName() == newTopic.getName() && currentTopic.getQoS() == newTopic.getQoS();
157 });
158
159 if (!existsInNew) {
160 topicsToUnsubscribe.push_back(currentTopic.getName());
161 }
162 }
163
164 if (!topicsToUnsubscribe.empty()) {
165 sendUnsubscribe(topicsToUnsubscribe);
166 }
167
168 std::list<iot::mqtt::Topic> topicsToSubscribe;
169 for (const auto& newTopic : newSubscriptions) {
170 const bool existsInOld = std::any_of(currentSubscriptions.begin(), currentSubscriptions.end(), [&](const auto& currentTopic) {
171 return currentTopic.getName() == newTopic.getName() && currentTopic.getQoS() == newTopic.getQoS();
172 });
173
174 if (!existsInOld) {
175 topicsToSubscribe.push_back(newTopic);
176 }
177 }
178
179 if (!topicsToSubscribe.empty()) {
180 sendSubscribe(topicsToSubscribe);
181 }
182
183 currentSubscriptions = newSubscriptions;
184
185 return {topicsToSubscribe.size(), topicsToUnsubscribe.size()};
186 }

References currentSubscriptions, mqtt::lib::MqttMapper::extractSubscriptions(), and mqttMapper.

Referenced by updateSubscriptions().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ updateSubscriptions()

mqtt::lib::admin::ReloadResult mqtt::mqttintegrator::lib::Mqtt::updateSubscriptions ( bool mustReconnect)
static

Definition at line 89 of file Mqtt.cpp.

89 {
90 mqtt::lib::admin::ReloadResult reloadResult;
91
92 reloadResult.instances = mqttInstances.size();
93 if (mustReconnect) {
94 reloadResult.mode = "reconnect";
95 } else {
96 reloadResult.mode = "hot";
97 }
98
99 for (Mqtt* mqtt : mqttInstances) {
100 if (mustReconnect) {
101 mqtt->sendDisconnect();
102 } else {
103 auto [subscribeCount, unsubscribeCount] = mqtt->resubscribe();
104
105 reloadResult.subscribed += subscribeCount;
106 reloadResult.unsubscribed += unsubscribeCount;
107 }
108 }
109
110 return reloadResult;
111 }
Mqtt(const std::string &connectionName, std::shared_ptr< mqtt::lib::MqttMapper > mqttMapper, const std::string &sessionStoreFileName)
Definition Mqtt.cpp:72

References mqtt::lib::admin::ReloadResult::instances, mqtt::lib::admin::ReloadResult::mode, mqttInstances, resubscribe(), mqtt::lib::admin::ReloadResult::subscribed, and mqtt::lib::admin::ReloadResult::unsubscribed.

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ currentSubscriptions

std::list<iot::mqtt::Topic> mqtt::mqttintegrator::lib::Mqtt::currentSubscriptions
private

Definition at line 101 of file Mqtt.h.

Referenced by Mqtt(), onConnack(), and resubscribe().

◆ delayedQueue

class mqtt::mqttintegrator::lib::Mqtt::DelayedQueue mqtt::mqttintegrator::lib::Mqtt::delayedQueue
private

◆ mqttInstances

std::set< Mqtt * > mqtt::mqttintegrator::lib::Mqtt::mqttInstances
staticprivate

Definition at line 129 of file Mqtt.h.

Referenced by Mqtt(), updateSubscriptions(), and ~Mqtt().

◆ mqttMapper

std::shared_ptr<mqtt::lib::MqttMapper> mqtt::mqttintegrator::lib::Mqtt::mqttMapper
private

Definition at line 100 of file Mqtt.h.

Referenced by Mqtt(), onConnected(), onPublish(), and resubscribe().


The documentation for this class was generated from the following files: