MQTTSuite
Loading...
Searching...
No Matches
Mqtt.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025, 2026
5 *
6 * This program is free software: you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the Free
8 * Software Foundation, either version 3 of the License, or (at your option)
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "Mqtt.h"
43
44#include <iot/mqtt/Topic.h>
45#include <iot/mqtt/packets/Connack.h>
46#include <iot/mqtt/packets/Publish.h>
47#include <iot/mqtt/packets/Suback.h>
48
49#ifndef DOXYGEN_SHOULD_SKIP_THIS
50
51#include <algorithm>
52#include <cstring>
53#include <iterator>
54#include <list>
55#include <log/Logger.h>
56#include <map>
57#include <nlohmann/json_fwd.hpp>
58#include <sstream>
59#include <stdexcept>
60#include <string>
61#include <string_view>
62#include <sys/ioctl.h>
63#include <tuple>
64#include <unistd.h>
65#include <utils/system/signal.h>
66#include <vector>
67
68#endif
69
70#include <nlohmann/json.hpp>
71
72// get current terminal width, fallback to 80
73static int getTerminalWidth() {
74 int termWidth = 80;
75
76 struct winsize w;
77 if (ioctl(STDOUT_FILENO, TIOCGWINSZ, &w) == 0 && w.ws_col > 0) {
78 termWidth = w.ws_col;
79 }
80
81 return termWidth;
82}
83
84// split one paragraph of text into lines of at most `width` characters
85static std::vector<std::string> wrapParagraph(const std::string& text, std::size_t width) {
86 std::istringstream words(text);
87 std::string word, line;
88 std::vector<std::string> lines;
89 while (words >> word) {
90 if (line.empty()) {
91 line = word;
92 } else if (line.size() + 1 + word.size() <= width) {
93 line += ' ' + word;
94 } else {
95 lines.push_back(line);
96 line = word;
97 }
98 }
99
100 if (!line.empty()) {
101 lines.push_back(line);
102 }
103
104 return lines;
105}
106
107///
108/// Formats:
109/// prefix ┬ headLine
110/// ├ <first message line>
111/// │ <middle lines>
112/// └ <last message line>
113///
114/// If `message` parses as JSON, we pretty‐print it (indent=2).
115/// Otherwise we wrap it to the terminal width.
116///
117/// Returns the whole formatted string (with trailing newline on each line).
118///
119std::vector<std::string> static myformat(const std::string& prefix,
120 const std::string& headLine,
121 const std::string& message,
122 std::size_t initialPrefixLength = 0) {
123 // how many spaces before the box‐drawing char on subsequent lines?
124 const size_t prefixLen = prefix.size();
125 const size_t indentCount = prefixLen + 1; // +1 for the space before ┬, +33 for easylogging++ prefix format
126 const std::string indent(indentCount, ' ');
127
128 std::vector<std::string> lines;
129
130 const int termWidth = getTerminalWidth();
131
132 size_t avail = (termWidth > int(indentCount + 2)) ? static_cast<std::size_t>(termWidth) - (indentCount + 2) : 20u;
133
134 auto wrapped = wrapParagraph(prefix + " ┬ " + headLine, avail - (prefix.length() + 2));
135
136 if (wrapped.empty()) {
137 wrapped.push_back("");
138 }
139
140 // lines.insert(lines.end(), wrapped.begin(), wrapped.end());
141
142 bool first = true;
143 for (const auto& line : wrapped) {
144 lines.emplace_back((first ? "" : indent + "│ ") + line);
145 first = false;
146 }
147
148 // try parsing as JSON
149 try {
150 auto j = nlohmann::json::parse(message);
151 // pretty‐print with 2-space indent
152 std::string pretty = j.dump(2);
153 // split into lines
154 std::istringstream prettyIStringStream(pretty);
155
156 for (auto [line, lineNumnber] = std::tuple{std::string(""), 0}; std::getline(prettyIStringStream, line); lineNumnber++) {
157 if (lineNumnber == 0 && !prettyIStringStream.eof()) {
158 lines.push_back(indent + "├ " + line);
159 } else if (prettyIStringStream.eof()) {
160 lines.push_back(indent + "└ " + line);
161 } else {
162 lines.push_back(indent + "│ " + line);
163 }
164 }
165 } catch (nlohmann::json::parse_error&) {
166 // not JSON → wrap text
167
168 // break original message on hard newlines and wrap each paragraph
169 std::istringstream messageIStringStream(message);
170 std::vector<std::string> allLines;
171 for (std::string line; std::getline(messageIStringStream, line);) {
172 wrapped = wrapParagraph(line, avail - initialPrefixLength);
173
174 if (wrapped.empty()) {
175 wrapped.push_back("");
176 }
177
178 allLines.insert(allLines.end(), wrapped.begin(), wrapped.end());
179 }
180
181 if (!allLines.empty() && allLines.back().empty()) {
182 allLines.pop_back();
183 }
184
185 // emit with ├, │ and └
186 for (std::size_t lineNumber = 0; lineNumber < allLines.size(); ++lineNumber) {
187 if (lineNumber == 0 && lineNumber + 1 != allLines.size()) {
188 lines.push_back(indent + "├ " + allLines[lineNumber]);
189 } else if (lineNumber + 1 == allLines.size()) {
190 lines.push_back(indent + "└ " + allLines[lineNumber]);
191 } else {
192 lines.push_back(indent + "│ " + allLines[lineNumber]);
193 }
194 }
195 }
196
197 return lines;
198}
199
200// 2025-05-28 17:46:11 0000000014358
201static const std::string formatAsLogString(const std::string& prefix, const std::string& headLine, const std::string& message) {
202 std::ostringstream formatAsLogStringStream;
203
204 for (const std::string& line : myformat(prefix, headLine, message, 34)) {
205 formatAsLogStringStream << (formatAsLogStringStream.view().empty() ? "" : " ") << line << "\n";
206 }
207
208 std::string formatStr = formatAsLogStringStream.str();
209
210 formatStr.pop_back();
211
212 return formatStr;
213}
214
215namespace mqtt::mqttcli::lib {
216
217 Mqtt::Mqtt(const std::string& connectionName,
218 const std::string& clientId,
219 uint8_t qoSDefault,
220 uint16_t keepAlive,
221 bool cleanSession,
222 const std::string& willTopic,
223 const std::string& willMessage,
224 uint8_t willQoS,
225 bool willRetain,
226 const std::string& username,
227 const std::string& password,
228 const std::list<std::string>& subTopics,
229 const std::string& pubTopic,
230 const std::string& pubMessage,
231 bool pubRetain,
232 const std::string& sessionStoreFileName)
233 : iot::mqtt::client::Mqtt(connectionName, clientId, keepAlive, sessionStoreFileName)
234 , qoSDefault(qoSDefault)
235 , cleanSession(cleanSession)
236 , willTopic(willTopic)
237 , willMessage(willMessage)
238 , willQoS(willQoS)
239 , willRetain(willRetain)
240 , username(username)
241 , password(password)
242 , subTopics(subTopics)
243 , pubTopic(pubTopic)
244 , pubMessage(pubMessage)
245 , pubRetain(pubRetain) {
246 VLOG(1) << "Client Id: " << clientId;
247 VLOG(1) << " Keep Alive: " << keepAlive;
248 VLOG(1) << " Clean Session: " << cleanSession;
249 VLOG(1) << " Will Topic: " << willTopic;
250 VLOG(1) << " Will Message: " << willMessage;
251 VLOG(1) << " Will QoS: " << static_cast<uint16_t>(willQoS);
252 VLOG(1) << " Will Retain " << willRetain;
253 VLOG(1) << " Username: " << username;
254 VLOG(1) << " Password: " << password;
255 }
256
258 VLOG(1) << "MQTT: Initiating Session";
259
261 }
262
263 bool Mqtt::onSignal(int signum) {
264 VLOG(1) << "MQTT: On Exit due to '" << strsignal(signum) << "' (SIG" << utils::system::sigabbrev_np(signum) << " = " << signum
265 << ")";
266
267 sendDisconnect();
268
269 return Super::onSignal(signum);
270 }
271
272 static uint8_t getQos(const std::string& qoSString) {
273 unsigned long qoS = std::stoul(qoSString);
274
275 if (qoS > 2) {
276 throw std::out_of_range("qos " + qoSString + " not in range [0..2]");
277 }
278
279 return static_cast<uint8_t>(qoS);
280 }
281
282 void Mqtt::onConnack(const iot::mqtt::packets::Connack& connack) {
283 if (connack.getReturnCode() == 0) {
284 bool sendDisconnectFlag = true;
285
286 if (!subTopics.empty()) {
287 VLOG(0) << "MQTT Subscribe";
288
289 try {
290 std::list<iot::mqtt::Topic> topicList;
291 std::transform(subTopics.begin(),
292 subTopics.end(),
293 std::back_inserter(topicList),
294 [qoSDefault = this->qoSDefault](const std::string& compositTopic) -> iot::mqtt::Topic {
295 std::size_t pos = compositTopic.rfind("##");
296
297 const std::string topic = compositTopic.substr(0, pos);
298 uint8_t qoS = qoSDefault;
299
300 if (pos != std::string::npos) {
301 try {
302 qoS = getQos(compositTopic.substr(pos + 2));
303 } catch (const std::logic_error& error) {
304 VLOG(0) << "[" << Color::Code::FG_RED << "Error" << Color::Code::FG_DEFAULT
305 << "] Malformed composit topic: " << compositTopic << "\n"
306 << error.what();
307 throw;
308 }
309 }
310 VLOG(0) << " t: " << static_cast<int>(qoS) << " | " << topic;
311 return iot::mqtt::Topic(topic, qoS);
312 });
313 sendSubscribe(topicList);
314
315 sendDisconnectFlag = false;
316 } catch (const std::logic_error&) {
317 }
318 }
319
320 if (!pubTopic.empty()) {
321 VLOG(0) << "MQTT Publish";
322
323 std::size_t pos = pubTopic.rfind("##");
324
325 const std::string topic = pubTopic.substr(0, pos);
326
327 uint8_t qoS = qoSDefault;
328
329 try {
330 if (pos != std::string::npos) {
331 try {
332 qoS = getQos(pubTopic.substr(pos + 2));
333 } catch (const std::logic_error& error) {
334 VLOG(0) << "[" << Color::Code::FG_RED << "Error" << Color::Code::FG_DEFAULT
335 << "] Malformed composit topic: " << pubTopic << "\n"
336 << error.what();
337 throw;
338 }
339 }
340 sendPublish(topic, pubMessage, qoS, pubRetain);
341
342 sendDisconnectFlag = qoS > 0 ? false : sendDisconnectFlag;
343 } catch (const std::logic_error&) {
344 }
345 }
346 if (sendDisconnectFlag) {
347 sendDisconnect();
348 }
349 } else {
350 sendDisconnect();
351 }
352 }
353
354 void Mqtt::onSuback(const iot::mqtt::packets::Suback& suback) {
355 VLOG(1) << "MQTT Suback";
356
357 for (auto returnCode : suback.getReturnCodes()) {
358 VLOG(0) << " r: " << static_cast<int>(returnCode);
359 }
360 }
361
362 void Mqtt::onPublish(const iot::mqtt::packets::Publish& publish) {
363 std::string prefix = "MQTT Publish";
364 std::string headLine = publish.getTopic() + " │ QoS: " + std::to_string(static_cast<uint16_t>(publish.getQoS())) +
365 " │ Retain: " + (publish.getRetain() != 0 ? "true" : "false") +
366 " │ Dup: " + (publish.getDup() != 0 ? "true" : "false");
367
368 VLOG(0) << formatAsLogString(prefix, headLine, publish.getMessage());
369 }
370
371 void Mqtt::onPuback([[maybe_unused]] const iot::mqtt::packets::Puback& puback) {
372 if (subTopics.empty()) {
373 sendDisconnect();
374 }
375 }
376
377 void Mqtt::onPubcomp([[maybe_unused]] const iot::mqtt::packets::Pubcomp& pubcomp) {
378 if (subTopics.empty()) {
379 sendDisconnect();
380 }
381 }
382
383} // namespace mqtt::mqttcli::lib
const std::string willTopic
Definition Mqtt.h:91
bool onSignal(int signum) final
Definition Mqtt.cpp:263
const uint8_t qoSDefault
Definition Mqtt.h:88
const bool willRetain
Definition Mqtt.h:94
void onConnack(const iot::mqtt::packets::Connack &connack) final
Definition Mqtt.cpp:282
void onConnected() final
Definition Mqtt.cpp:257
const std::string username
Definition Mqtt.h:95
const uint8_t willQoS
Definition Mqtt.h:93
iot::mqtt::client::Mqtt Super
Definition Mqtt.h:77
const std::string password
Definition Mqtt.h:96
const std::list< std::string > subTopics
Definition Mqtt.h:98
void onPublish(const iot::mqtt::packets::Publish &publish) final
Definition Mqtt.cpp:362
Mqtt(const std::string &connectionName, const std::string &clientId, uint8_t qoSDefault, uint16_t keepAlive, bool cleanSession, const std::string &willTopic, const std::string &willMessage, uint8_t willQoS, bool willRetain, const std::string &username, const std::string &password, const std::list< std::string > &subTopics, const std::string &pubTopic, const std::string &pubMessage, bool pubRetain=false, const std::string &sessionStoreFileName="")
Definition Mqtt.cpp:217
const bool cleanSession
Definition Mqtt.h:89
const std::string pubMessage
Definition Mqtt.h:100
void onSuback(const iot::mqtt::packets::Suback &suback) final
Definition Mqtt.cpp:354
void onPuback(const iot::mqtt::packets::Puback &puback) final
Definition Mqtt.cpp:371
const bool pubRetain
Definition Mqtt.h:101
void onPubcomp(const iot::mqtt::packets::Pubcomp &pubcomp) final
Definition Mqtt.cpp:377
const std::string pubTopic
Definition Mqtt.h:99
const std::string willMessage
Definition Mqtt.h:92
static std::vector< std::string > myformat(const std::string &prefix, const std::string &headLine, const std::string &message, std::size_t initialPrefixLength=0)
Definition Mqtt.cpp:119
static const std::string formatAsLogString(const std::string &prefix, const std::string &headLine, const std::string &message)
Definition Mqtt.cpp:201
static int getTerminalWidth()
Definition Mqtt.cpp:73
static std::vector< std::string > wrapParagraph(const std::string &text, std::size_t width)
Definition Mqtt.cpp:85
static uint8_t getQos(const std::string &qoSString)
Definition Mqtt.cpp:272