88reportState(
const std::string& instanceName,
const core::socket::SocketAddress& socketAddress,
const core::socket::State& state) {
90 case core::socket::State::OK:
91 VLOG(1) << instanceName <<
": connected to '" << socketAddress.toString() <<
"'";
93 case core::socket::State::DISABLED:
94 VLOG(1) << instanceName <<
": disabled";
96 case core::socket::State::ERROR:
97 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
99 case core::socket::State::FATAL:
100 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
105static void logResponse(
const std::shared_ptr<web::http::client::Request>& req,
const std::shared_ptr<web::http::client::Response>& res) {
106 VLOG(1) << req->getSocketContext()->getSocketConnection()->getConnectionName() <<
" HTTP response for: " << req->method <<
" "
107 << req->url <<
" HTTP/" << req->httpMajor <<
"." << req->httpMinor <<
"\n"
108 << httputils::toString(req->method,
110 "HTTP/" + std::to_string(req->httpMajor) +
"." + std::to_string(req->httpMinor),
117 << httputils::toString(res->httpVersion, res->statusCode, res->reason, res->headers, res->cookies, res->body);
121void startClient(
const std::string& name,
const std::function<
void(
typename HttpClient::Config&)>& configurator) {
122 using SocketAddress =
typename HttpClient::SocketAddress;
124 const HttpClient httpClient(
126 [](
const std::shared_ptr<web::http::client::MasterRequest>& req) {
127 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
128 const std::string target = req->getSocketContext()
129 ->getSocketConnection()
130 ->getConfigInstance()
132 ->get_option(
"--target")
135 req->set(
"Sec-WebSocket-Protocol",
"mqtt");
140 [connectionName](
bool success) {
141 VLOG(1) << connectionName <<
": HTTP Upgrade (http -> websocket||"
142 <<
"mqtt" <<
") start " << (success ?
"success" :
"failed");
144 [connectionName](
const std::shared_ptr<web::http::client::Request>& req,
145 const std::shared_ptr<web::http::client::Response>& res,
147 logResponse(req, res);
149 VLOG(1) << connectionName <<
": HTTP Upgrade " << (success ?
"success" :
"failed");
151 [connectionName]([[maybe_unused]]
const std::shared_ptr<web::http::client::Request>& req,
const std::string& message) {
152 VLOG(1) << connectionName <<
": Response parse error: " << message;
153 VLOG(1) <<
" Request was: " << req->method <<
" " << req->url <<
" HTTP/" << req->httpMajor <<
"." << req->httpMinor
155 << httputils::toString(req->method,
157 "HTTP/" + std::to_string(req->httpMajor) +
"." + std::to_string(req->httpMinor),
166 []([[maybe_unused]]
const std::shared_ptr<web::http::client::Request>& req) {
167 VLOG(1) <<
"Session ended";
170 configurator(httpClient.getConfig());
172 httpClient.connect([name](
const SocketAddress& socketAddress,
const core::socket::State& state) {
173 reportState(name, socketAddress, state);
177static void createConfig(CLI::App* sessionApp, CLI::App* subApp, CLI::App* pubApp) {
178 sessionApp->configurable(
false);
179 subApp->configurable(
false);
180 pubApp->configurable(
false);
182 CLI::Option* clientIdOpt = sessionApp->add_option(
"--client-id",
"MQTT Client-ID")
183 ->group(sessionApp->get_formatter()->get_label(
"Persistent Options"))
184 ->type_name(
"string");
186 sessionApp->add_option(
"--qos",
"Quality of service")
187 ->group(sessionApp->get_formatter()->get_label(
"Persistent Options"))
188 ->type_name(
"uint8_t")
192 sessionApp->add_flag(
"--retain-session{true},-r{true}",
"Clean session")
193 ->group(sessionApp->get_formatter()->get_label(
"Persistent Options"))
195 ->default_str(
"false")
196 ->check(CLI::IsMember({
"true",
"false"}))
198 ->needs(clientIdOpt);
200 sessionApp->add_option(
"--keep-alive",
"Quality of service")
201 ->group(sessionApp->get_formatter()->get_label(
"Persistent Options"))
202 ->type_name(
"uint8_t")
206 sessionApp->add_option(
"--will-topic",
"MQTT will topic")
207 ->group(sessionApp->get_formatter()->get_label(
"Persistent Options"))
208 ->type_name(
"string")
211 sessionApp->add_option(
"--will-message",
"MQTT will message")
212 ->group(sessionApp->get_formatter()->get_label(
"Persistent Options"))
213 ->type_name(
"string")
216 sessionApp->add_option(
"--will-qos",
"MQTT will quality of service")
217 ->group(sessionApp->get_formatter()->get_label(
"Persistent Options"))
218 ->type_name(
"uint8_t")
222 sessionApp->add_flag(
"--will-retain{true}",
"MQTT will message retain")
223 ->group(sessionApp->get_formatter()->get_label(
"Persistent Options"))
224 ->default_str(
"false")
226 ->check(CLI::IsMember({
"true",
"false"}))
229 sessionApp->add_option(
"--username",
"MQTT username")
230 ->group(sessionApp->get_formatter()->get_label(
"Persistent Options"))
231 ->type_name(
"string")
234 sessionApp->add_option(
"--password",
"MQTT password")
235 ->group(sessionApp->get_formatter()->get_label(
"Persistent Options"))
236 ->type_name(
"string")
240 ->add_option_function<std::string>(
242 [subApp](
const std::string& value) {
244 subApp->get_option(
"--topic")->required(
false)->clear();
245 subApp->remove_needs(subApp->get_option(
"--topic"));
248 "List of topics subscribing to")
249 ->group(subApp->get_formatter()->get_label(
"Persistent Options"))
250 ->type_name(
"string list")
257 ->add_option_function<std::string>(
259 [pubApp](
const std::string& value) {
261 pubApp->get_option(
"--topic")->required(
false)->clear();
262 pubApp->remove_needs(pubApp->get_option(
"--topic"));
264 pubApp->get_option(
"--message")->required(
false)->clear();
265 pubApp->remove_needs(pubApp->get_option(
"--message"));
268 "Topic publishing to")
269 ->group(pubApp->get_formatter()->get_label(
"Persistent Options"))
270 ->type_name(
"string")
275 ->add_option_function<std::string>(
277 [pubApp](
const std::string& value) {
279 pubApp->get_option(
"--topic")->required(
false)->clear();
280 pubApp->remove_needs(pubApp->get_option(
"--topic"));
282 pubApp->get_option(
"--message")->required(
false)->clear();
283 pubApp->remove_needs(pubApp->get_option(
"--message"));
286 "Message to publish")
287 ->group(pubApp->get_formatter()->get_label(
"Persistent Options"))
288 ->type_name(
"string")
292 pubApp->add_flag(
"--retain{true},-r{true}",
"Retain message")
293 ->group(pubApp->get_formatter()->get_label(
"Persistent Options"))
294 ->default_str(
"false")
296 ->check(CLI::IsMember({
"true",
"false"}))
301 createConfig(config.addSection(
"session",
"MQTT session behavior",
"Connection")
,
302 config.addSection(
"sub",
"Configuration for application mqttsub",
"Applications")
,
303 config.addSection(
"pub",
"Configuration for application mqttpub",
"Applications")
);
305 config.get()->require_callback([config = &config]() {
306 if (!config->getDisabled() && utils::Config::showConfigTriggerApp ==
nullptr &&
307 utils::Config::app->get_option(
"--write-config")->count() == 0) {
308 CLI::App* pubApp = config->getSection(
"pub",
true,
true);
309 CLI::App* subApp = config->getSection(
"sub",
true,
true);
311 if ((pubApp ==
nullptr || (*pubApp)[
"--topic"]->count() == 0 || (*pubApp)[
"--message"]->count() == 0) &&
312 (subApp ==
nullptr || (*subApp)[
"--topic"]->count() == 0)) {
313 throw CLI::RequiresError(config->get()->get_parent()->get_name() +
":" + config->getInstanceName() +
314 " requires at least one of {sub | pub}",
315 CLI::ExitCodes::RequiresError);
318 if (pubApp !=
nullptr) {
319 VLOG(0) <<
"[" << Color::Code::FG_LIGHT_GREEN <<
"Success" << Color::Code::FG_DEFAULT <<
"] " <<
"Bootstrap of "
320 << config->getInstanceName() <<
":pub";
323 if (subApp !=
nullptr) {
324 VLOG(0) <<
"[" << Color::Code::FG_LIGHT_GREEN <<
"Success" << Color::Code::FG_DEFAULT <<
"] " <<
"Bootstrap of "
325 << config->getInstanceName() <<
":sub";
342int main(
int argc,
char* argv[]) {
343 core::SNodeC::init(argc, argv);
345#if defined(LINK_WEBSOCKET_STATIC) || defined(LINK_SUBPROTOCOL_STATIC)
346 web::websocket::client::SocketContextUpgradeFactory::link();
349#ifdef LINK_SUBPROTOCOL_STATIC
350 web::websocket::client::SubProtocolFactorySelector::link(
"mqtt", mqttClientSubProtocolFactory);
353 utils::Config::app->get_formatter()->label(
"SUBCOMMAND",
"APPLICATION | CONNECTION | INSTANCE");
354 utils::Config::app->get_formatter()->label(
"SUBCOMMANDS",
"APPLICATION | CONNECTION | INSTANCES");
356 createConfig(utils::Config::addInstance(
"session",
"MQTT session behavior",
"Connection",
true),
357 utils::Config::addInstance(
"sub",
"Configuration for application mqttsub",
"Applications",
true),
358 utils::Config::addInstance(
"pub",
"Configuration for application mqttpub",
"Applications",
true));
362#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV4)
363 net::in::stream::legacy::Client<mqtt::mqttcli::SocketContextFactory>(
"in-mqtt", [](
auto& config) {
364 config.Remote::setPort(1883);
367 config.setRetryBase(1);
368 config.setDisableNagleAlgorithm();
370 createConfig(config);
371 }).connect([](
const auto& socketAddress,
const core::socket::State& state) {
372 reportState(
"in-mqtt", socketAddress, state);
376#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV4)
377 net::in::stream::tls::Client<mqtt::mqttcli::SocketContextFactory>(
"in-mqtts", [](
auto& config) {
378 config.Remote::setPort(1883);
381 config.setRetryBase(1);
382 config.setDisableNagleAlgorithm();
383 config.setDisabled();
385 createConfig(config);
386 }).connect([](
const auto& socketAddress,
const core::socket::State& state) {
387 reportState(
"in-mqtts", socketAddress, state);
391#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV6)
392 net::in6::stream::legacy::Client<mqtt::mqttcli::SocketContextFactory>(
"in6-mqtt", [](
auto& config) {
393 config.Remote::setPort(1883);
396 config.setRetryBase(1);
397 config.setDisableNagleAlgorithm();
398 config.setDisabled();
400 createConfig(config);
401 }).connect([](
const auto& socketAddress,
const core::socket::State& state) {
402 reportState(
"in6-mqtt", socketAddress, state);
406#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV6)
407 net::in6::stream::tls::Client<mqtt::mqttcli::SocketContextFactory>(
"in6-mqtts", [](
auto& config) {
408 config.Remote::setPort(1883);
411 config.setRetryBase(1);
412 config.setDisableNagleAlgorithm();
413 config.setDisabled();
415 createConfig(config);
416 }).connect([](
const auto& socketAddress,
const core::socket::State& state) {
417 reportState(
"in6-mqtts", socketAddress, state);
421#if defined(CONFIG_MQTTSUITE_CLI_UNIX)
422 net::un::stream::legacy::Client<mqtt::mqttcli::SocketContextFactory>(
"un-mqtt", [](
auto& config) {
423 config.Remote::setSunPath(
"/var/mqttbroker-un-mqtt");
426 config.setRetryBase(1);
427 config.setDisabled();
429 createConfig(config);
430 }).connect([](
const auto& socketAddress,
const core::socket::State& state) {
431 reportState(
"un-mqtt", socketAddress, state);
435#if defined(CONFIG_MQTTSUITE_CLI_UNIX_TLS)
436 net::un::stream::tls::Client<mqtt::mqttcli::SocketContextFactory>(
"un-mqtts", [](
auto& config) {
437 config.Remote::setSunPath(
"/var/mqttbroker-un-mqtts");
440 config.setRetryBase(1);
441 config.setDisabled();
443 createConfig(config);
444 }).connect([](
const auto& socketAddress,
const core::socket::State& state) {
445 reportState(
"un-mqtts", socketAddress, state);
449#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV4) && defined(CONFIG_MQTTSUITE_CLI_WS)
450 startClient<web::http::legacy::in::Client>(
"in-wsmqtt", [](
auto& config) {
451 config.Remote::setPort(8080);
454 config.setRetryBase(1);
455 config.setDisableNagleAlgorithm();
456 config.setDisabled();
458 createWSConfig(config);
462#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV4) && defined(CONFIG_MQTTSUITE_CLI_WSS)
463 startClient<web::http::tls::in::Client>(
"in-wsmqtts", [](
auto& config) {
464 config.Remote::setPort(8088);
467 config.setRetryBase(1);
468 config.setDisableNagleAlgorithm();
469 config.setDisabled();
471 createWSConfig(config);
475#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV6) && defined(CONFIG_MQTTSUITE_CLI_WS)
476 startClient<web::http::legacy::in6::Client>(
"in6-wsmqtt", [](
auto& config) {
477 config.Remote::setPort(8080);
480 config.setRetryBase(1);
481 config.setDisableNagleAlgorithm();
482 config.setDisabled();
484 createWSConfig(config);
488#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV6) && defined(CONFIG_MQTTSUITE_CLI_WSS)
489 startClient<web::http::tls::in6::Client>(
"in6-wsmqtts", [](
auto& config) {
490 config.Remote::setPort(8088);
492 config.setReconnect();
494 config.setRetryBase(1);
495 config.setDisableNagleAlgorithm();
496 config.setDisabled();
498 createWSConfig(config);
502#if defined(CONFIG_MQTTSUITE_CLI_UNIX) && defined(CONFIG_MQTTSUITE_CLI_WS)
503 startClient<web::http::legacy::un::Client>(
"un-wsmqtt", [](
auto& config) {
505 config.setRetryBase(1);
506 config.setReconnect();
507 config.setDisabled();
509 createWSConfig(config);
513#if defined(CONFIG_MQTTSUITE_CLI_UNIX_TLS) && defined(CONFIG_MQTTSUITE_CLI_WSS)
514 startClient<web::http::tls::un::Client>(
"un-wsmqtts", [](
auto& config) {
516 config.setRetryBase(1);
517 config.setReconnect();
518 config.setDisabled();
520 createWSConfig(config);
524 return core::SNodeC::start();