MQTTSuite
Loading...
Searching...
No Matches
Mqtt.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the Free
8 * Software Foundation, either version 3 of the License, or (at your option)
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "Mqtt.h"
43
44#include <iot/mqtt/Topic.h>
45#include <iot/mqtt/packets/Connack.h>
46#include <iot/mqtt/packets/Publish.h>
47#include <iot/mqtt/packets/Suback.h>
48
49#ifndef DOXYGEN_SHOULD_SKIP_THIS
50
51#include <algorithm>
52#include <cstring>
53#include <iterator>
54#include <list>
55#include <log/Logger.h>
56#include <map>
57#include <nlohmann/json_fwd.hpp>
58#include <sstream>
59#include <stdexcept>
60#include <string>
61#include <string_view>
62#include <sys/ioctl.h>
63#include <tuple>
64#include <unistd.h>
65#include <utils/system/signal.h>
66#include <vector>
67
68#endif
69
70// include the single‐header JSON library:
71// https://github.com/nlohmann/json/releases
72#include <nlohmann/json.hpp>
73using json = nlohmann::json;
74
75// get current terminal width, fallback to 80
76static int getTerminalWidth() {
77 int termWidth = 80;
78
79 struct winsize w;
80 if (ioctl(STDOUT_FILENO, TIOCGWINSZ, &w) == 0 && w.ws_col > 0) {
81 termWidth = w.ws_col;
82 }
83
84 return termWidth;
85}
86
87// split one paragraph of text into lines of at most `width` characters
88static std::vector<std::string> wrapParagraph(const std::string& text, std::size_t width) {
89 std::istringstream words(text);
90 std::string word, line;
91 std::vector<std::string> lines;
92 while (words >> word) {
93 if (line.empty()) {
94 line = word;
95 } else if (line.size() + 1 + word.size() <= width) {
96 line += ' ' + word;
97 } else {
98 lines.push_back(line);
99 line = word;
100 }
101 }
102
103 if (!line.empty()) {
104 lines.push_back(line);
105 }
106
107 return lines;
108}
109
110///
111/// Formats:
112/// prefix ┬ headLine
113/// ├ <first message line>
114/// │ <middle lines>
115/// └ <last message line>
116///
117/// If `message` parses as JSON, we pretty‐print it (indent=2).
118/// Otherwise we wrap it to the terminal width.
119///
120/// Returns the whole formatted string (with trailing newline on each line).
121///
122std::vector<std::string> static myformat(const std::string& prefix,
123 const std::string& headLine,
124 const std::string& message,
125 std::size_t initialPrefixLength = 0) {
126 // how many spaces before the box‐drawing char on subsequent lines?
127 const size_t prefixLen = prefix.size();
128 const size_t indentCount = prefixLen + 1; // +1 for the space before ┬, +33 for easylogging++ prefix format
129 const std::string indent(indentCount, ' ');
130
131 std::vector<std::string> lines;
132
133 const int termWidth = getTerminalWidth();
134
135 size_t avail = (termWidth > int(indentCount + 2)) ? static_cast<std::size_t>(termWidth) - (indentCount + 2) : 20u;
136
137 auto wrapped = wrapParagraph(prefix + " ┬ " + headLine, avail - (prefix.length() + 2));
138
139 if (wrapped.empty()) {
140 wrapped.push_back("");
141 }
142
143 // lines.insert(lines.end(), wrapped.begin(), wrapped.end());
144
145 bool first = true;
146 for (const auto& line : wrapped) {
147 lines.emplace_back((first ? "" : indent + "│ ") + line);
148 first = false;
149 }
150
151 // try parsing as JSON
152 try {
153 auto j = json::parse(message);
154 // pretty‐print with 2-space indent
155 std::string pretty = j.dump(2);
156 // split into lines
157 std::istringstream prettyIStringStream(pretty);
158
159 for (auto [line, lineNumnber] = std::tuple{std::string(""), 0}; std::getline(prettyIStringStream, line); lineNumnber++) {
160 if (lineNumnber == 0 && !prettyIStringStream.eof()) {
161 lines.push_back(indent + "├ " + line);
162 } else if (prettyIStringStream.eof()) {
163 lines.push_back(indent + "└ " + line);
164 } else {
165 lines.push_back(indent + "│ " + line);
166 }
167 }
168 } catch (json::parse_error&) {
169 // not JSON → wrap text
170
171 // break original message on hard newlines and wrap each paragraph
172 std::istringstream messageIStringStream(message);
173 std::vector<std::string> allLines;
174 for (std::string line; std::getline(messageIStringStream, line);) {
175 wrapped = wrapParagraph(line, avail - initialPrefixLength);
176
177 if (wrapped.empty()) {
178 wrapped.push_back("");
179 }
180
181 allLines.insert(allLines.end(), wrapped.begin(), wrapped.end());
182 }
183
184 if (!allLines.empty() && allLines.back().empty()) {
185 allLines.pop_back();
186 }
187
188 // emit with ├, │ and └
189 for (std::size_t lineNumber = 0; lineNumber < allLines.size(); ++lineNumber) {
190 if (lineNumber == 0 && lineNumber + 1 != allLines.size()) {
191 lines.push_back(indent + "├ " + allLines[lineNumber]);
192 } else if (lineNumber + 1 == allLines.size()) {
193 lines.push_back(indent + "└ " + allLines[lineNumber]);
194 } else {
195 lines.push_back(indent + "│ " + allLines[lineNumber]);
196 }
197 }
198 }
199
200 return lines;
201}
202
203// 2025-05-28 17:46:11 0000000014358
204static const std::string formatAsLogString(const std::string& prefix, const std::string& headLine, const std::string& message) {
205 std::ostringstream formatAsLogStringStream;
206
207 for (const std::string& line : myformat(prefix, headLine, message, 34)) {
208 formatAsLogStringStream << (formatAsLogStringStream.view().empty() ? "" : " ") << line << "\n";
209 }
210
211 std::string formatStr = formatAsLogStringStream.str();
212
213 formatStr.pop_back();
214
215 return formatStr;
216}
217
218namespace mqtt::mqttcli::lib {
219
220 Mqtt::Mqtt(const std::string& connectionName,
221 const std::string& clientId,
222 uint8_t qoSDefault,
223 uint16_t keepAlive,
224 bool cleanSession,
225 const std::string& willTopic,
226 const std::string& willMessage,
227 uint8_t willQoS,
228 bool willRetain,
229 const std::string& username,
230 const std::string& password,
231 const std::list<std::string>& subTopics,
232 const std::string& pubTopic,
233 const std::string& pubMessage,
234 bool pubRetain,
235 const std::string& sessionStoreFileName)
236 : iot::mqtt::client::Mqtt(connectionName, clientId, keepAlive, sessionStoreFileName)
237 , qoSDefault(qoSDefault)
238 , cleanSession(cleanSession)
239 , willTopic(willTopic)
240 , willMessage(willMessage)
241 , willQoS(willQoS)
242 , willRetain(willRetain)
243 , username(username)
244 , password(password)
245 , subTopics(subTopics)
246 , pubTopic(pubTopic)
247 , pubMessage(pubMessage)
248 , pubRetain(pubRetain) {
249 VLOG(1) << "Client Id: " << clientId;
250 VLOG(1) << " Keep Alive: " << keepAlive;
251 VLOG(1) << " Clean Session: " << cleanSession;
252 VLOG(1) << " Will Topic: " << willTopic;
253 VLOG(1) << " Will Message: " << willMessage;
254 VLOG(1) << " Will QoS: " << static_cast<uint16_t>(willQoS);
255 VLOG(1) << " Will Retain " << willRetain;
256 VLOG(1) << " Username: " << username;
257 VLOG(1) << " Password: " << password;
258 }
259
261 VLOG(1) << "MQTT: Initiating Session";
262
264 }
265
266 bool Mqtt::onSignal(int signum) {
267 VLOG(1) << "MQTT: On Exit due to '" << strsignal(signum) << "' (SIG" << utils::system::sigabbrev_np(signum) << " = " << signum
268 << ")";
269
270 sendDisconnect();
271
272 return Super::onSignal(signum);
273 }
274
275 static uint8_t getQos(const std::string& qoSString) {
276 unsigned long qoS = std::stoul(qoSString);
277
278 if (qoS > 2) {
279 throw std::out_of_range("qos " + qoSString + " not in range [0..2]");
280 }
281
282 return static_cast<uint8_t>(qoS);
283 }
284
285 void Mqtt::onConnack(const iot::mqtt::packets::Connack& connack) {
286 bool sendDisconnectFlag = true;
287
288 if (connack.getReturnCode() == 0) {
289 if (!subTopics.empty()) {
290 VLOG(0) << "MQTT Subscribe";
291
292 try {
293 std::list<iot::mqtt::Topic> topicList;
294 std::transform(subTopics.begin(),
295 subTopics.end(),
296 std::back_inserter(topicList),
297 [qoSDefault = this->qoSDefault](const std::string& compositTopic) -> iot::mqtt::Topic {
298 std::size_t pos = compositTopic.rfind("##");
299
300 const std::string topic = compositTopic.substr(0, pos);
301 uint8_t qoS = qoSDefault;
302
303 if (pos != std::string::npos) {
304 try {
305 qoS = getQos(compositTopic.substr(pos + 2));
306 } catch (const std::logic_error& error) {
307 VLOG(0) << "[" << Color::Code::FG_RED << "Error" << Color::Code::FG_DEFAULT
308 << "] Malformed composit topic: " << compositTopic << "\n"
309 << error.what();
310 throw;
311 }
312 }
313 VLOG(0) << " t: " << static_cast<int>(qoS) << " | " << topic;
314 return iot::mqtt::Topic(topic, qoS);
315 });
316 sendSubscribe(topicList);
317
318 sendDisconnectFlag = false;
319 } catch (const std::logic_error&) {
320 }
321 }
322
323 if (!pubTopic.empty()) {
324 VLOG(0) << "MQTT Publish";
325
326 std::size_t pos = pubTopic.rfind("##");
327
328 const std::string topic = pubTopic.substr(0, pos);
329
330 uint8_t qoS = qoSDefault;
331
332 try {
333 if (pos != std::string::npos) {
334 try {
335 qoS = getQos(pubTopic.substr(pos + 2));
336 } catch (const std::logic_error& error) {
337 VLOG(0) << "[" << Color::Code::FG_RED << "Error" << Color::Code::FG_DEFAULT
338 << "] Malformed composit topic: " << pubTopic << "\n"
339 << error.what();
340 throw;
341 }
342 }
343 sendPublish(topic, pubMessage, qoS, pubRetain);
344
345 sendDisconnectFlag = qoS > 0 ? false : sendDisconnectFlag;
346 } catch (const std::logic_error&) {
347 }
348 }
349 if (sendDisconnectFlag) {
350 sendDisconnect();
351 }
352 } else {
353 sendDisconnect();
354 }
355 }
356
357 void Mqtt::onSuback(const iot::mqtt::packets::Suback& suback) {
358 VLOG(1) << "MQTT Suback";
359
360 for (auto returnCode : suback.getReturnCodes()) {
361 VLOG(0) << " r: " << static_cast<int>(returnCode);
362 }
363 }
364
365 void Mqtt::onPublish(const iot::mqtt::packets::Publish& publish) {
366 std::string prefix = "MQTT Publish";
367 std::string headLine = publish.getTopic() + " │ QoS: " + std::to_string(static_cast<uint16_t>(publish.getQoS())) +
368 " │ Retain: " + (publish.getRetain() != 0 ? "true" : "false") +
369 " │ Dup: " + (publish.getDup() != 0 ? "true" : "false");
370
371 VLOG(0) << formatAsLogString(prefix, headLine, publish.getMessage());
372 }
373
374 void Mqtt::onPuback([[maybe_unused]] const iot::mqtt::packets::Puback& puback) {
375 if (subTopics.empty()) {
376 sendDisconnect();
377 }
378 }
379
380 void Mqtt::onPubcomp([[maybe_unused]] const iot::mqtt::packets::Pubcomp& pubcomp) {
381 if (subTopics.empty()) {
382 sendDisconnect();
383 }
384 }
385
386} // namespace mqtt::mqttcli::lib
nlohmann::json json
const std::string willTopic
Definition Mqtt.h:91
bool onSignal(int signum) final
Definition Mqtt.cpp:266
const uint8_t qoSDefault
Definition Mqtt.h:88
const bool willRetain
Definition Mqtt.h:94
void onConnack(const iot::mqtt::packets::Connack &connack) final
Definition Mqtt.cpp:285
void onConnected() final
Definition Mqtt.cpp:260
const std::string username
Definition Mqtt.h:95
const uint8_t willQoS
Definition Mqtt.h:93
iot::mqtt::client::Mqtt Super
Definition Mqtt.h:77
const std::string password
Definition Mqtt.h:96
const std::list< std::string > subTopics
Definition Mqtt.h:98
void onPublish(const iot::mqtt::packets::Publish &publish) final
Definition Mqtt.cpp:365
Mqtt(const std::string &connectionName, const std::string &clientId, uint8_t qoSDefault, uint16_t keepAlive, bool cleanSession, const std::string &willTopic, const std::string &willMessage, uint8_t willQoS, bool willRetain, const std::string &username, const std::string &password, const std::list< std::string > &subTopics, const std::string &pubTopic, const std::string &pubMessage, bool pubRetain=false, const std::string &sessionStoreFileName="")
Definition Mqtt.cpp:220
const bool cleanSession
Definition Mqtt.h:89
const std::string pubMessage
Definition Mqtt.h:100
void onSuback(const iot::mqtt::packets::Suback &suback) final
Definition Mqtt.cpp:357
void onPuback(const iot::mqtt::packets::Puback &puback) final
Definition Mqtt.cpp:374
const bool pubRetain
Definition Mqtt.h:101
void onPubcomp(const iot::mqtt::packets::Pubcomp &pubcomp) final
Definition Mqtt.cpp:380
const std::string pubTopic
Definition Mqtt.h:99
const std::string willMessage
Definition Mqtt.h:92
static std::vector< std::string > myformat(const std::string &prefix, const std::string &headLine, const std::string &message, std::size_t initialPrefixLength=0)
Definition Mqtt.cpp:122
static const std::string formatAsLogString(const std::string &prefix, const std::string &headLine, const std::string &message)
Definition Mqtt.cpp:204
static int getTerminalWidth()
Definition Mqtt.cpp:76
static std::vector< std::string > wrapParagraph(const std::string &text, std::size_t width)
Definition Mqtt.cpp:88
static uint8_t getQos(const std::string &qoSString)
Definition Mqtt.cpp:275