MQTTSuite
Loading...
Searching...
No Matches
mqttcli.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the Free
8 * Software Foundation, either version 3 of the License, or (at your option)
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "SocketContextFactory.h" // IWYU pragma: keep
43#include "config.h"
44
45#include <web/http/http_utils.h>
46
47#ifdef LINK_SUBPROTOCOL_STATIC
48
49#include "websocket/SubProtocolFactory.h"
50
51#include <web/websocket/client/SubProtocolFactorySelector.h>
52
53#endif
54
55#if defined(LINK_WEBSOCKET_STATIC) || defined(LINK_SUBPROTOCOL_STATIC)
56
57#include <web/websocket/client/SocketContextUpgradeFactory.h>
58
59#endif
60
61#ifndef DOXYGEN_SHOULD_SKIP_THIS
62
63#include <core/SNodeC.h>
64//
65#include <net/in/stream/legacy/SocketClient.h>
66#include <net/in/stream/tls/SocketClient.h>
67#include <net/in6/stream/legacy/SocketClient.h>
68#include <net/in6/stream/tls/SocketClient.h>
69#include <net/un/stream/legacy/SocketClient.h>
70#include <net/un/stream/tls/SocketClient.h>
71#include <web/http/legacy/in/Client.h>
72#include <web/http/legacy/in6/Client.h>
73#include <web/http/legacy/un/Client.h>
74#include <web/http/tls/in/Client.h>
75#include <web/http/tls/in6/Client.h>
76#include <web/http/tls/un/Client.h>
77//
78#include <log/Logger.h>
79#include <utils/Config.h>
80//
81#include <utils/CLI11.hpp>
82//
83#include <string>
84
85#endif
86
87static void
88reportState(const std::string& instanceName, const core::socket::SocketAddress& socketAddress, const core::socket::State& state) {
89 switch (state) {
90 case core::socket::State::OK:
91 VLOG(1) << instanceName << ": connected to '" << socketAddress.toString() << "'";
92 break;
93 case core::socket::State::DISABLED:
94 VLOG(1) << instanceName << ": disabled";
95 break;
96 case core::socket::State::ERROR:
97 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
98 break;
99 case core::socket::State::FATAL:
100 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
101 break;
102 }
103}
104
105static void logResponse(const std::shared_ptr<web::http::client::Request>& req, const std::shared_ptr<web::http::client::Response>& res) {
106 VLOG(1) << req->getSocketContext()->getSocketConnection()->getConnectionName() << " HTTP response for: " << req->method << " "
107 << req->url << " HTTP/" << req->httpMajor << "." << req->httpMinor << "\n"
108 << httputils::toString(req->method,
109 req->url,
110 "HTTP/" + std::to_string(req->httpMajor) + "." + std::to_string(req->httpMinor),
111 req->getQueries(),
112 req->getHeaders(),
113 req->getTrailer(),
114 req->getCookies(),
115 {})
116 << "\n"
117 << httputils::toString(res->httpVersion, res->statusCode, res->reason, res->headers, res->cookies, res->body);
118}
119
120template <typename HttpClient>
121void startClient(const std::string& name, const std::function<void(typename HttpClient::Config&)>& configurator) {
122 using SocketAddress = typename HttpClient::SocketAddress;
123
124 const HttpClient httpClient(
125 name,
126 [](const std::shared_ptr<web::http::client::MasterRequest>& req) {
127 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
128 const std::string target = req->getSocketContext()
129 ->getSocketConnection()
130 ->getConfigInstance()
131 ->getSection("http")
132 ->get_option("--target")
133 ->as<std::string>();
134
135 req->set("Sec-WebSocket-Protocol", "mqtt");
136
137 req->upgrade(
138 target,
139 "websocket",
140 [connectionName](bool success) {
141 VLOG(1) << connectionName << ": HTTP Upgrade (http -> websocket||"
142 << "mqtt" << ") start " << (success ? "success" : "failed");
143 },
144 [connectionName](const std::shared_ptr<web::http::client::Request>& req,
145 const std::shared_ptr<web::http::client::Response>& res,
146 bool success) {
147 logResponse(req, res);
148
149 VLOG(1) << connectionName << ": HTTP Upgrade " << (success ? "success" : "failed");
150 },
151 [connectionName]([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req, const std::string& message) {
152 VLOG(1) << connectionName << ": Response parse error: " << message;
153 VLOG(1) << " Request was: " << req->method << " " << req->url << " HTTP/" << req->httpMajor << "." << req->httpMinor
154 << "\n"
155 << httputils::toString(req->method,
156 req->url,
157 "HTTP/" + std::to_string(req->httpMajor) + "." + std::to_string(req->httpMinor),
158 req->getQueries(),
159 req->getHeaders(),
160 req->getTrailer(),
161 req->getCookies(),
162 {})
163 << "\n";
164 });
165 },
166 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req) {
167 VLOG(1) << "Session ended";
168 });
169
170 configurator(httpClient.getConfig());
171
172 httpClient.connect([name](const SocketAddress& socketAddress, const core::socket::State& state) {
173 reportState(name, socketAddress, state);
174 });
175}
176
177static void createConfig(CLI::App* sessionApp, CLI::App* subApp, CLI::App* pubApp) {
178 sessionApp->configurable(false);
179 subApp->configurable(false);
180 pubApp->configurable(false);
181
182 CLI::Option* clientIdOpt = sessionApp->add_option("--client-id", "MQTT Client-ID")
183 ->group(sessionApp->get_formatter()->get_label("Persistent Options"))
184 ->type_name("string");
185
186 sessionApp->add_option("--qos", "Quality of service")
187 ->group(sessionApp->get_formatter()->get_label("Persistent Options"))
188 ->type_name("uint8_t")
189 ->default_val(0)
190 ->configurable();
191
192 sessionApp->add_flag("--retain-session{true},-r{true}", "Clean session")
193 ->group(sessionApp->get_formatter()->get_label("Persistent Options"))
194 ->type_name("bool")
195 ->default_str("false")
196 ->check(CLI::IsMember({"true", "false"}))
197 ->configurable()
198 ->needs(clientIdOpt);
199
200 sessionApp->add_option("--keep-alive", "Quality of service")
201 ->group(sessionApp->get_formatter()->get_label("Persistent Options"))
202 ->type_name("uint8_t")
203 ->default_val(60)
204 ->configurable();
205
206 sessionApp->add_option("--will-topic", "MQTT will topic")
207 ->group(sessionApp->get_formatter()->get_label("Persistent Options"))
208 ->type_name("string")
209 ->configurable();
210
211 sessionApp->add_option("--will-message", "MQTT will message")
212 ->group(sessionApp->get_formatter()->get_label("Persistent Options"))
213 ->type_name("string")
214 ->configurable();
215
216 sessionApp->add_option("--will-qos", "MQTT will quality of service")
217 ->group(sessionApp->get_formatter()->get_label("Persistent Options"))
218 ->type_name("uint8_t")
219 ->default_val(0)
220 ->configurable();
221
222 sessionApp->add_flag("--will-retain{true}", "MQTT will message retain")
223 ->group(sessionApp->get_formatter()->get_label("Persistent Options"))
224 ->default_str("false")
225 ->type_name("bool")
226 ->check(CLI::IsMember({"true", "false"}))
227 ->configurable();
228
229 sessionApp->add_option("--username", "MQTT username")
230 ->group(sessionApp->get_formatter()->get_label("Persistent Options"))
231 ->type_name("string")
232 ->configurable();
233
234 sessionApp->add_option("--password", "MQTT password")
235 ->group(sessionApp->get_formatter()->get_label("Persistent Options"))
236 ->type_name("string")
237 ->configurable();
238
239 subApp->needs(subApp
240 ->add_option_function<std::string>(
241 "--topic",
242 [subApp](const std::string& value) {
243 if (value == "") {
244 subApp->get_option("--topic")->required(false)->clear();
245 subApp->remove_needs(subApp->get_option("--topic"));
246 }
247 },
248 "List of topics subscribing to")
249 ->group(subApp->get_formatter()->get_label("Persistent Options"))
250 ->type_name("string list")
251 ->take_all()
252 ->required()
253 ->allow_extra_args()
254 ->configurable());
255
256 pubApp->needs(pubApp
257 ->add_option_function<std::string>(
258 "--topic",
259 [pubApp](const std::string& value) {
260 if (value == "") {
261 pubApp->get_option("--topic")->required(false)->clear();
262 pubApp->remove_needs(pubApp->get_option("--topic"));
263
264 pubApp->get_option("--message")->required(false)->clear();
265 pubApp->remove_needs(pubApp->get_option("--message"));
266 }
267 },
268 "Topic publishing to")
269 ->group(pubApp->get_formatter()->get_label("Persistent Options"))
270 ->type_name("string")
271 ->required()
272 ->configurable());
273
274 pubApp->needs(pubApp
275 ->add_option_function<std::string>(
276 "--message",
277 [pubApp](const std::string& value) {
278 if (value == "") {
279 pubApp->get_option("--topic")->required(false)->clear();
280 pubApp->remove_needs(pubApp->get_option("--topic"));
281
282 pubApp->get_option("--message")->required(false)->clear();
283 pubApp->remove_needs(pubApp->get_option("--message"));
284 }
285 },
286 "Message to publish")
287 ->group(pubApp->get_formatter()->get_label("Persistent Options"))
288 ->type_name("string")
289 ->required()
290 ->configurable());
291
292 pubApp->add_flag("--retain{true},-r{true}", "Retain message")
293 ->group(pubApp->get_formatter()->get_label("Persistent Options"))
294 ->default_str("false")
295 ->type_name("bool")
296 ->check(CLI::IsMember({"true", "false"}))
297 ->configurable();
298}
299
300static void createConfig(net::config::ConfigInstance& config) {
301 createConfig(config.addSection("session", "MQTT session behavior", "Connection"),
302 config.addSection("sub", "Configuration for application mqttsub", "Applications"),
303 config.addSection("pub", "Configuration for application mqttpub", "Applications"));
304
305 config.get()->require_callback([config = &config]() {
306 if (!config->getDisabled() && utils::Config::showConfigTriggerApp == nullptr &&
307 utils::Config::app->get_option("--write-config")->count() == 0) {
308 CLI::App* pubApp = config->getSection("pub", true, true);
309 CLI::App* subApp = config->getSection("sub", true, true);
310
311 if ((pubApp == nullptr || (*pubApp)["--topic"]->count() == 0 || (*pubApp)["--message"]->count() == 0) &&
312 (subApp == nullptr || (*subApp)["--topic"]->count() == 0)) {
313 throw CLI::RequiresError(config->get()->get_parent()->get_name() + ":" + config->getInstanceName() +
314 " requires at least one of {sub | pub}",
315 CLI::ExitCodes::RequiresError);
316 }
317
318 if (pubApp != nullptr) {
319 VLOG(0) << "[" << Color::Code::FG_LIGHT_GREEN << "Success" << Color::Code::FG_DEFAULT << "] " << "Bootstrap of "
320 << config->getInstanceName() << ":pub";
321 }
322
323 if (subApp != nullptr) {
324 VLOG(0) << "[" << Color::Code::FG_LIGHT_GREEN << "Success" << Color::Code::FG_DEFAULT << "] " << "Bootstrap of "
325 << config->getInstanceName() << ":sub";
326 }
327 }
328 });
329}
330
331static void createWSConfig(net::config::ConfigInstance& config) {
332 createConfig(config);
333
334 CLI::App* http = config.getSection("http");
335 http->add_option("--target", "Websocket endpoint")
336 ->group(http->get_formatter()->get_label("Persistent Options"))
337 ->type_name("string")
338 ->default_str("/ws")
339 ->configurable();
340}
341
342int main(int argc, char* argv[]) {
343 core::SNodeC::init(argc, argv);
344
345#if defined(LINK_WEBSOCKET_STATIC) || defined(LINK_SUBPROTOCOL_STATIC)
346 web::websocket::client::SocketContextUpgradeFactory::link();
347#endif
348
349#ifdef LINK_SUBPROTOCOL_STATIC
350 web::websocket::client::SubProtocolFactorySelector::link("mqtt", mqttClientSubProtocolFactory);
351#endif
352
353 utils::Config::app->get_formatter()->label("SUBCOMMAND", "APPLICATION | CONNECTION | INSTANCE");
354 utils::Config::app->get_formatter()->label("SUBCOMMANDS", "APPLICATION | CONNECTION | INSTANCES");
355
356 createConfig(utils::Config::addInstance("session", "MQTT session behavior", "Connection", true),
357 utils::Config::addInstance("sub", "Configuration for application mqttsub", "Applications", true),
358 utils::Config::addInstance("pub", "Configuration for application mqttpub", "Applications", true));
359
360 // Start of application
361
362#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV4)
363 net::in::stream::legacy::Client<mqtt::mqttcli::SocketContextFactory>("in-mqtt", [](auto& config) {
364 config.Remote::setPort(1883);
365
366 config.setRetry();
367 config.setRetryBase(1);
368 config.setDisableNagleAlgorithm();
369
370 createConfig(config);
371 }).connect([](const auto& socketAddress, const core::socket::State& state) {
372 reportState("in-mqtt", socketAddress, state);
373 });
374#endif
375
376#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV4)
377 net::in::stream::tls::Client<mqtt::mqttcli::SocketContextFactory>("in-mqtts", [](auto& config) {
378 config.Remote::setPort(1883);
379
380 config.setRetry();
381 config.setRetryBase(1);
382 config.setDisableNagleAlgorithm();
383 config.setDisabled();
384
385 createConfig(config);
386 }).connect([](const auto& socketAddress, const core::socket::State& state) {
387 reportState("in-mqtts", socketAddress, state);
388 });
389#endif
390
391#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV6)
392 net::in6::stream::legacy::Client<mqtt::mqttcli::SocketContextFactory>("in6-mqtt", [](auto& config) {
393 config.Remote::setPort(1883);
394
395 config.setRetry();
396 config.setRetryBase(1);
397 config.setDisableNagleAlgorithm();
398 config.setDisabled();
399
400 createConfig(config);
401 }).connect([](const auto& socketAddress, const core::socket::State& state) {
402 reportState("in6-mqtt", socketAddress, state);
403 });
404#endif
405
406#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV6)
407 net::in6::stream::tls::Client<mqtt::mqttcli::SocketContextFactory>("in6-mqtts", [](auto& config) {
408 config.Remote::setPort(1883);
409
410 config.setRetry();
411 config.setRetryBase(1);
412 config.setDisableNagleAlgorithm();
413 config.setDisabled();
414
415 createConfig(config);
416 }).connect([](const auto& socketAddress, const core::socket::State& state) {
417 reportState("in6-mqtts", socketAddress, state);
418 });
419#endif
420
421#if defined(CONFIG_MQTTSUITE_CLI_UNIX)
422 net::un::stream::legacy::Client<mqtt::mqttcli::SocketContextFactory>("un-mqtt", [](auto& config) {
423 config.Remote::setSunPath("/var/mqttbroker-un-mqtt");
424
425 config.setRetry();
426 config.setRetryBase(1);
427 config.setDisabled();
428
429 createConfig(config);
430 }).connect([](const auto& socketAddress, const core::socket::State& state) {
431 reportState("un-mqtt", socketAddress, state);
432 });
433#endif
434
435#if defined(CONFIG_MQTTSUITE_CLI_UNIX_TLS)
436 net::un::stream::tls::Client<mqtt::mqttcli::SocketContextFactory>("un-mqtts", [](auto& config) {
437 config.Remote::setSunPath("/var/mqttbroker-un-mqtts");
438
439 config.setRetry();
440 config.setRetryBase(1);
441 config.setDisabled();
442
443 createConfig(config);
444 }).connect([](const auto& socketAddress, const core::socket::State& state) {
445 reportState("un-mqtts", socketAddress, state);
446 });
447#endif
448
449#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV4) && defined(CONFIG_MQTTSUITE_CLI_WS)
450 startClient<web::http::legacy::in::Client>("in-wsmqtt", [](auto& config) {
451 config.Remote::setPort(8080);
452
453 config.setRetry();
454 config.setRetryBase(1);
455 config.setDisableNagleAlgorithm();
456 config.setDisabled();
457
458 createWSConfig(config);
459 });
460#endif
461
462#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV4) && defined(CONFIG_MQTTSUITE_CLI_WSS)
463 startClient<web::http::tls::in::Client>("in-wsmqtts", [](auto& config) {
464 config.Remote::setPort(8088);
465
466 config.setRetry();
467 config.setRetryBase(1);
468 config.setDisableNagleAlgorithm();
469 config.setDisabled();
470
471 createWSConfig(config);
472 });
473#endif
474
475#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV6) && defined(CONFIG_MQTTSUITE_CLI_WS)
476 startClient<web::http::legacy::in6::Client>("in6-wsmqtt", [](auto& config) {
477 config.Remote::setPort(8080);
478
479 config.setRetry();
480 config.setRetryBase(1);
481 config.setDisableNagleAlgorithm();
482 config.setDisabled();
483
484 createWSConfig(config);
485 });
486#endif
487
488#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV6) && defined(CONFIG_MQTTSUITE_CLI_WSS)
489 startClient<web::http::tls::in6::Client>("in6-wsmqtts", [](auto& config) {
490 config.Remote::setPort(8088);
491
492 config.setReconnect();
493 config.setRetry();
494 config.setRetryBase(1);
495 config.setDisableNagleAlgorithm();
496 config.setDisabled();
497
498 createWSConfig(config);
499 });
500#endif
501
502#if defined(CONFIG_MQTTSUITE_CLI_UNIX) && defined(CONFIG_MQTTSUITE_CLI_WS)
503 startClient<web::http::legacy::un::Client>("un-wsmqtt", [](auto& config) {
504 config.setRetry();
505 config.setRetryBase(1);
506 config.setReconnect();
507 config.setDisabled();
508
509 createWSConfig(config);
510 });
511#endif
512
513#if defined(CONFIG_MQTTSUITE_CLI_UNIX_TLS) && defined(CONFIG_MQTTSUITE_CLI_WSS)
514 startClient<web::http::tls::un::Client>("un-wsmqtts", [](auto& config) {
515 config.setRetry();
516 config.setRetryBase(1);
517 config.setReconnect();
518 config.setDisabled();
519
520 createWSConfig(config);
521 });
522#endif
523
524 return core::SNodeC::start();
525}
int main(int argc, char *argv[])
static void createConfig(CLI::App *sessionApp, CLI::App *subApp, CLI::App *pubApp)
Definition mqttcli.cpp:177
static void createWSConfig(net::config::ConfigInstance &config)
Definition mqttcli.cpp:331
void startClient(const std::string &name, const std::function< void(typename HttpClient::Config &)> &configurator)
Definition mqttcli.cpp:121
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
Definition mqttcli.cpp:88
static void logResponse(const std::shared_ptr< web::http::client::Request > &req, const std::shared_ptr< web::http::client::Response > &res)
Definition mqttcli.cpp:105
static void createConfig(net::config::ConfigInstance &config)
Definition mqttcli.cpp:300