MQTTSuite
Loading...
Searching...
No Matches
mqttcli.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025, 2026
5 *
6 * This program is free software: you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the Free
8 * Software Foundation, either version 3 of the License, or (at your option)
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "SocketContextFactory.h" // IWYU pragma: keep
43#include "config.h"
44#include "lib/ConfigSections.h" // IWYU pragma: keep
45
46#ifdef LINK_SUBPROTOCOL_STATIC
47
48#include "websocket/SubProtocolFactory.h"
49
50#include <web/websocket/client/SubProtocolFactorySelector.h>
51
52#endif
53
54#if defined(LINK_WEBSOCKET_STATIC) || defined(LINK_SUBPROTOCOL_STATIC)
55
56#include <web/websocket/client/SocketContextUpgradeFactory.h>
57
58#endif
59
60#include <core/SNodeC.h>
61#include <net/config/ConfigInstance.h>
62//
63#include <net/in/stream/legacy/SocketClient.h>
64#include <net/in/stream/tls/SocketClient.h>
65#include <net/in6/stream/legacy/SocketClient.h>
66#include <net/in6/stream/tls/SocketClient.h>
67#include <net/un/stream/legacy/SocketClient.h>
68#include <net/un/stream/tls/SocketClient.h>
69#include <web/http/client/ConfigHTTP.h>
70#include <web/http/http_utils.h>
71#include <web/http/legacy/in/Client.h>
72#include <web/http/legacy/in6/Client.h>
73#include <web/http/legacy/un/Client.h>
74#include <web/http/tls/in/Client.h>
75#include <web/http/tls/in6/Client.h>
76#include <web/http/tls/un/Client.h>
77
78#ifndef DOXYGEN_SHOULD_SKIP_THIS
79
80#include <log/Logger.h>
81//
82#include <list>
83
84#endif
85
86static void
87reportState(const std::string& instanceName, const core::socket::SocketAddress& socketAddress, const core::socket::State& state) {
88 switch (state) {
89 case core::socket::State::OK:
90 VLOG(1) << instanceName << ": connected to '" << socketAddress.toString() << "'";
91 break;
92 case core::socket::State::DISABLED:
93 VLOG(1) << instanceName << ": disabled";
94 break;
95 case core::socket::State::ERROR:
96 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
97 break;
98 case core::socket::State::FATAL:
99 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
100 break;
101 }
102}
103
104static void logResponse(const std::shared_ptr<web::http::client::Request>& req, const std::shared_ptr<web::http::client::Response>& res) {
105 VLOG(1) << req->getSocketContext()->getSocketConnection()->getConnectionName() << " HTTP response for: " << req->method << " "
106 << req->url << " HTTP/" << req->httpMajor << "." << req->httpMinor << "\n"
107 << httputils::toString(req->method,
108 req->url,
109 "HTTP/" + std::to_string(req->httpMajor) + "." + std::to_string(req->httpMinor),
110 req->getQueries(),
111 req->getHeaders(),
112 req->getTrailer(),
113 req->getCookies(),
114 {})
115 << "\n"
116 << httputils::toString(res->httpVersion, res->statusCode, res->reason, res->headers, res->cookies, res->body);
117}
118
119template <template <typename SocketContextFactoryT, typename... ArgsT> typename SocketClient>
120static SocketClient<mqtt::mqttcli::SocketContextFactory>
121startClient(const std::string& instanceName,
122 const std::function<void(typename SocketClient<mqtt::mqttcli::SocketContextFactory>::Config*)>& configurator) {
123 using Client = SocketClient<mqtt::mqttcli::SocketContextFactory>;
124 using SocketAddress = typename Client::SocketAddress;
125
126 Client socketClient = core::socket::stream::Client<Client>(instanceName, configurator);
127
128 socketClient.getConfig()->setRetry();
129 socketClient.getConfig()->setRetryBase(1);
130 socketClient.getConfig()->setReconnect();
131 socketClient.getConfig()->setDisabled();
132
133 socketClient.connect([instanceName](const SocketAddress& socketAddress, const core::socket::State& state) {
134 reportState(instanceName, socketAddress, state);
135 });
136
137 return socketClient;
138}
139
140template <typename HttpClient>
141static HttpClient startClient(const std::string& name, const std::function<void(typename HttpClient::Config*)>& configurator) {
142 using SocketAddress = typename HttpClient::SocketAddress;
143
144 const HttpClient httpClient(
145 name,
146 [](const std::shared_ptr<web::http::client::MasterRequest>& req) {
147 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
148 const std::string target = req->getSocketContext()
149 ->getSocketConnection()
150 ->getConfigInstance()
151 ->getSubCommand<web::http::client::ConfigHTTP>()
152 ->getOption("--target")
153 ->as<std::string>();
154
155 req->set("Sec-WebSocket-Protocol", "mqtt");
156
157 req->upgrade(
158 target,
159 "websocket",
160 [connectionName](bool success) {
161 VLOG(1) << connectionName << ": HTTP Upgrade (http -> websocket||"
162 << "mqtt" << ") start " << (success ? "success" : "failed");
163 },
164 [connectionName](const std::shared_ptr<web::http::client::Request>& req,
165 const std::shared_ptr<web::http::client::Response>& res,
166 bool success) {
167 logResponse(req, res);
168
169 VLOG(1) << connectionName << ": HTTP Upgrade " << (success ? "success" : "failed");
170 },
171 [connectionName](const std::shared_ptr<web::http::client::Request>& req, const std::string& message) {
172 VLOG(1) << connectionName << ": Response parse error: " << message;
173 VLOG(1) << " Request was: " << req->method << " " << req->url << " HTTP/" << req->httpMajor << "." << req->httpMinor
174 << "\n"
175 << httputils::toString(req->method,
176 req->url,
177 "HTTP/" + std::to_string(req->httpMajor) + "." + std::to_string(req->httpMinor),
178 req->getQueries(),
179 req->getHeaders(),
180 req->getTrailer(),
181 req->getCookies(),
182 {})
183 << "\n";
184 });
185 },
186 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req) {
187 VLOG(1) << "Session ended";
188 });
189
190 configurator(httpClient.getConfig());
191
192 httpClient.getConfig()->setRetry();
193 httpClient.getConfig()->setRetryBase(1);
194 httpClient.getConfig()->setReconnect();
195 httpClient.getConfig()->setDisabled();
196
197 httpClient.connect([name](const SocketAddress& socketAddress, const core::socket::State& state) {
198 reportState(name, socketAddress, state);
199 });
200
201 return httpClient;
202}
203
204static void createConfig(net::config::ConfigInstance* config) {
205 config->newSubCommand<mqtt::mqttcli::lib::ConfigSession>();
206 config->newSubCommand<mqtt::mqttcli::lib::ConfigSubscribe>();
207 config->newSubCommand<mqtt::mqttcli::lib::ConfigPublish>();
208
209 config->setRequireCallback([config]() {
210 if (!config->getDisabled() && config->getShowConfigTriggerApp() == nullptr &&
211 config->getParent()->getOption("--write-config")->count() == 0) {
212 const mqtt::mqttcli::lib::ConfigPublish* pubApp = config->getSubCommand<mqtt::mqttcli::lib::ConfigPublish>();
213 const mqtt::mqttcli::lib::ConfigSubscribe* subApp = config->getSubCommand<mqtt::mqttcli::lib::ConfigSubscribe>();
214
215 if (pubApp->getTopic().empty() && subApp->getTopic().empty()) {
216 throw CLI::RequiresError(config->getParent()->getName() + ":" + config->getInstanceName() +
217 " requires at least one of {sub | pub}",
218 CLI::ExitCodes::RequiresError);
219 }
220
221 if (!pubApp->getTopic().empty()) {
222 VLOG(0) << "[" << Color::Code::FG_LIGHT_GREEN << "Success" << Color::Code::FG_DEFAULT << "] " << "Bootstrap of "
223 << config->getInstanceName() << ":pub";
224 }
225
226 if (!subApp->getTopic().empty()) {
227 VLOG(0) << "[" << Color::Code::FG_LIGHT_GREEN << "Success" << Color::Code::FG_DEFAULT << "] " << "Bootstrap of "
228 << config->getInstanceName() << ":sub";
229 }
230 }
231 });
232}
233
234static void createWSConfig(net::config::ConfigInstance* config) {
235 createConfig(config);
236
237 config->getSubCommand<web::http::client::ConfigHTTP>()
238 ->addOption("--target", "Websocket endpoint", "string", "/ws", CLI::TypeValidator<std::string>())
239 ->configurable();
240}
241
242int main(int argc, char* argv[]) {
243 core::SNodeC::init(argc, argv);
244
245#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV4)
246 startClient<net::in::stream::legacy::SocketClient>( //
247 "in-mqtt",
248 [](net::in::stream::legacy::config::ConfigSocketClient* config) {
249 config->Remote::setPort(1883);
250 config->setDisableNagleAlgorithm();
251
252 createConfig(config); // cppcheck-suppress throwInEntryPoint
253 });
254#endif
255
256#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV4)
257 startClient<net::in::stream::tls::SocketClient>( //
258 "in-mqtts",
259 [](net::in::stream::tls::config::ConfigSocketClient* config) {
260 config->Remote::setPort(1883);
261 config->setDisableNagleAlgorithm();
262
263 createConfig(config); // cppcheck-suppress throwInEntryPoint
264 });
265#endif
266
267#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV6)
268 startClient<net::in6::stream::legacy::SocketClient>( //
269 "in6-mqtt",
270 [](net::in6::stream::legacy::config::ConfigSocketClient* config) {
271 config->Remote::setPort(1883);
272 config->setDisableNagleAlgorithm();
273
274 createConfig(config); // cppcheck-suppress throwInEntryPoint
275 });
276#endif
277
278#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV6)
279 startClient<net::in6::stream::tls::SocketClient>( //
280 "in6-mqtts",
281 [](net::in6::stream::tls::config::ConfigSocketClient* config) {
282 config->Remote::setPort(1883);
283 config->setDisableNagleAlgorithm();
284
285 createConfig(config); // cppcheck-suppress throwInEntryPoint
286 });
287#endif
288
289#if defined(CONFIG_MQTTSUITE_CLI_UNIX)
290 startClient<net::un::stream::legacy::SocketClient>( //
291 "un-mqtt",
292 [](net::un::stream::legacy::config::ConfigSocketClient* config) {
293 createConfig(config); // cppcheck-suppress throwInEntryPoint
294 });
295#endif
296
297#if defined(CONFIG_MQTTSUITE_CLI_UNIX_TLS)
298 startClient<net::un::stream::tls::SocketClient>( //
299 "un-mqtts",
300 [](net::un::stream::tls::config::ConfigSocketClient* config) {
301 createConfig(config); // cppcheck-suppress throwInEntryPoint
302 });
303#endif
304
305#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV4) && defined(CONFIG_MQTTSUITE_CLI_WS)
306 startClient<web::http::legacy::in::Client>( //
307 "in-wsmqtt",
308 [](net::in::stream::legacy::config::ConfigSocketClient* config) {
309 config->Remote::setPort(8080);
310 config->setDisableNagleAlgorithm();
311
312 createWSConfig(config);
313 });
314#endif
315
316#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV4) && defined(CONFIG_MQTTSUITE_CLI_WSS)
317 startClient<web::http::tls::in::Client>( //
318 "in-wsmqtts",
319 [](net::in::stream::tls::config::ConfigSocketClient* config) {
320 config->Remote::setPort(8088);
321 config->setDisableNagleAlgorithm();
322
323 createWSConfig(config);
324 });
325#endif
326
327#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV6) && defined(CONFIG_MQTTSUITE_CLI_WS)
328 startClient<web::http::legacy::in6::Client>( //
329 "in6-wsmqtt",
330 [](net::in6::stream::legacy::config::ConfigSocketClient* config) {
331 config->Remote::setPort(8080);
332 config->setDisableNagleAlgorithm();
333
334 createWSConfig(config);
335 });
336#endif
337
338#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV6) && defined(CONFIG_MQTTSUITE_CLI_WSS)
339 startClient<web::http::tls::in6::Client>( //
340 "in6-wsmqtts",
341 [](net::in6::stream::tls::config::ConfigSocketClient* config) {
342 config->Remote::setPort(8088);
343 config->setDisableNagleAlgorithm();
344
345 createWSConfig(config);
346 });
347#endif
348
349#if defined(CONFIG_MQTTSUITE_CLI_UNIX) && defined(CONFIG_MQTTSUITE_CLI_WS)
350 startClient<web::http::legacy::un::Client>( //
351 "un-wsmqtt",
352 [](net::un::stream::legacy::config::ConfigSocketClient* config) {
353 createWSConfig(config);
354 });
355#endif
356
357#if defined(CONFIG_MQTTSUITE_CLI_UNIX_TLS) && defined(CONFIG_MQTTSUITE_CLI_WSS)
358 startClient<web::http::tls::un::Client>( //
359 "un-wsmqtts",
360 [](net::un::stream::tls::config::ConfigSocketClient* config) {
361 createWSConfig(config);
362 });
363#endif
364
365 return core::SNodeC::start();
366}
int main(int argc, char *argv[])
static SocketClient< mqtt::mqttcli::SocketContextFactory > startClient(const std::string &instanceName, const std::function< void(typename SocketClient< mqtt::mqttcli::SocketContextFactory >::Config *)> &configurator)
Definition mqttcli.cpp:121
static HttpClient startClient(const std::string &name, const std::function< void(typename HttpClient::Config *)> &configurator)
Definition mqttcli.cpp:141
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
Definition mqttcli.cpp:87
static void logResponse(const std::shared_ptr< web::http::client::Request > &req, const std::shared_ptr< web::http::client::Response > &res)
Definition mqttcli.cpp:104
static void createWSConfig(net::config::ConfigInstance *config)
Definition mqttcli.cpp:234
static void createConfig(net::config::ConfigInstance *config)
Definition mqttcli.cpp:204