MQTTSuite
Loading...
Searching...
No Matches
Mqtt.h
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025, 2026
5 * Tobias Pfeil
6 * 2025, 2026
7 *
8 * This program is free software: you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License as published by the Free
10 * Software Foundation, either version 3 of the License, or (at your option)
11 * any later version.
12 *
13 * This program is distributed in the hope that it will be useful, but WITHOUT
14 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
15 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
16 * more details.
17 *
18 * You should have received a copy of the GNU General Public License along
19 * with this program. If not, see <https://www.gnu.org/licenses/>.
20 */
21
22/*
23 * MIT License
24 *
25 * Permission is hereby granted, free of charge, to any person obtaining a copy
26 * of this software and associated documentation files (the "Software"), to deal
27 * in the Software without restriction, including without limitation the rights
28 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
29 * copies of the Software, and to permit persons to whom the Software is
30 * furnished to do so, subject to the following conditions:
31 *
32 * The above copyright notice and this permission notice shall be included in
33 * all copies or substantial portions of the Software.
34 *
35 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
36 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
37 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
38 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
39 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
40 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
41 * THE SOFTWARE.
42 */
43
44#ifndef APPS_MQTTBROKER_MQTTINTEGRATOR_SOCKETCONTEXT_H
45#define APPS_MQTTBROKER_MQTTINTEGRATOR_SOCKETCONTEXT_H
46
47#include <iot/mqtt/client/Mqtt.h>
48
49namespace mqtt::lib {
50 class MqttMapper;
51 namespace admin {
52 struct ReloadResult;
53 }
54} // namespace mqtt::lib
55
56namespace iot::mqtt {
57 class Topic;
58 namespace packets {
59 class Publish;
60 }
61} // namespace iot::mqtt
62
63#ifndef DOXYGEN_SHOULD_SKIP_THIS
64
65#include <cstddef>
66#include <list>
67#include <memory>
68#include <queue>
69#include <set>
70#include <string>
71#include <utility>
72#include <vector>
73
74#endif
75
76namespace mqtt::mqttintegrator::lib {
77
78 class Mqtt : public iot::mqtt::client::Mqtt {
79 public:
80 explicit Mqtt(const std::string& connectionName,
81 std::shared_ptr<mqtt::lib::MqttMapper> mqttMapper,
82 const std::string& sessionStoreFileName);
83
84 ~Mqtt() override;
85 static mqtt::lib::admin::ReloadResult updateSubscriptions(bool mustReconnect);
86
87 private:
88 using Super = iot::mqtt::client::Mqtt;
89
90 struct ScheduledPublish;
91
92 void onConnected() final;
93 [[nodiscard]] bool onSignal(int signum) final;
94
95 void onConnack(const iot::mqtt::packets::Connack& connack) final;
96 void onPublish(const iot::mqtt::packets::Publish& publish) final;
97
98 std::pair<std::size_t, std::size_t> resubscribe();
99
100 std::shared_ptr<mqtt::lib::MqttMapper> mqttMapper;
101 std::list<iot::mqtt::Topic> currentSubscriptions;
102
104 private:
106 bool operator()(const ScheduledPublish& a, const ScheduledPublish& b) const;
107 };
108
109 public:
110 explicit DelayedQueue(Mqtt* mqtt);
111 ~DelayedQueue();
112
113 void delayPublish(const utils::Timeval& delay, const iot::mqtt::packets::Publish& publish);
114
115 bool empty() const;
116 ScheduledPublish const& top() const;
117 void pop();
118
119 private:
121 std::size_t nextSeq = 0;
122 std::priority_queue<ScheduledPublish, std::vector<ScheduledPublish>, EarlierFirst> minHeap;
123
124 core::timer::Timer delayTimer;
125 void processDue();
126 void armDelayTimer();
127 } delayedQueue;
128
129 static std::set<Mqtt*> mqttInstances;
130 };
131
132} // namespace mqtt::mqttintegrator::lib
133
134#endif // APPS_MQTTBROKER_MQTTINTEGRATOR_SOCKETCONTEXT_H
std::list< iot::mqtt::Topic > extractSubscriptions() const
MappedPublishes getMappings(const iot::mqtt::packets::Publish &publish)
const nlohmann::json & getConnection() const
std::priority_queue< ScheduledPublish, std::vector< ScheduledPublish >, EarlierFirst > minHeap
Definition Mqtt.h:122
ScheduledPublish const & top() const
Definition Mqtt.cpp:245
void delayPublish(const utils::Timeval &delay, const iot::mqtt::packets::Publish &publish)
Definition Mqtt.cpp:236
static std::set< Mqtt * > mqttInstances
Definition Mqtt.h:129
void onPublish(const iot::mqtt::packets::Publish &publish) final
Definition Mqtt.cpp:136
void onConnack(const iot::mqtt::packets::Connack &connack) final
Definition Mqtt.cpp:130
std::pair< std::size_t, std::size_t > resubscribe()
Definition Mqtt.cpp:150
static mqtt::lib::admin::ReloadResult updateSubscriptions(bool mustReconnect)
Definition Mqtt.cpp:89
std::shared_ptr< mqtt::lib::MqttMapper > mqttMapper
Definition Mqtt.h:100
std::list< iot::mqtt::Topic > currentSubscriptions
Definition Mqtt.h:101
bool onSignal(int signum) final
Definition Mqtt.cpp:125
iot::mqtt::client::Mqtt Super
Definition Mqtt.h:88
iot::mqtt::packets::Publish publish
Definition MqttMapper.h:80
bool operator()(const ScheduledPublish &a, const ScheduledPublish &b) const
Definition Mqtt.cpp:188
iot::mqtt::packets::Publish publish
Definition Mqtt.cpp:68