87reportState(
const std::string& instanceName,
const core::socket::SocketAddress& socketAddress,
const core::socket::State& state) {
89 case core::socket::State::OK:
90 VLOG(1) << instanceName <<
": connected to '" << socketAddress.toString() <<
"'";
92 case core::socket::State::DISABLED:
93 VLOG(1) << instanceName <<
": disabled";
95 case core::socket::State::ERROR:
96 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
98 case core::socket::State::FATAL:
99 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
104static void logResponse(
const std::shared_ptr<web::http::client::Request>& req,
const std::shared_ptr<web::http::client::Response>& res) {
105 VLOG(1) << req->getSocketContext()->getSocketConnection()->getConnectionName() <<
" HTTP response for: " << req->method <<
" "
106 << req->url <<
" HTTP/" << req->httpMajor <<
"." << req->httpMinor <<
"\n"
107 << httputils::toString(req->method,
109 "HTTP/" + std::to_string(req->httpMajor) +
"." + std::to_string(req->httpMinor),
116 << httputils::toString(res->httpVersion, res->statusCode, res->reason, res->headers, res->cookies, res->body);
122 const std::function<
void(
typename SocketClient<mqtt::mqttcli::
SocketContextFactory>::Config*)>& configurator) {
124 using SocketAddress =
typename Client::SocketAddress;
126 Client socketClient = core::socket::stream::Client<Client>(instanceName, configurator);
128 socketClient.getConfig()->setRetry();
129 socketClient.getConfig()->setRetryBase(1);
130 socketClient.getConfig()->setReconnect();
131 socketClient.getConfig()->setDisabled();
133 socketClient.connect([instanceName](
const SocketAddress& socketAddress,
const core::socket::State& state) {
134 reportState(instanceName, socketAddress, state);
141static HttpClient
startClient(
const std::string& name,
const std::function<
void(
typename HttpClient::Config*)>& configurator) {
142 using SocketAddress =
typename HttpClient::SocketAddress;
144 const HttpClient httpClient(
146 [](
const std::shared_ptr<web::http::client::MasterRequest>& req) {
147 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
148 const std::string target = req->getSocketContext()
149 ->getSocketConnection()
150 ->getConfigInstance()
151 ->getSubCommand<web::http::client::ConfigHTTP>()
152 ->getOption(
"--target")
155 req->set(
"Sec-WebSocket-Protocol",
"mqtt");
160 [connectionName](
bool success) {
161 VLOG(1) << connectionName <<
": HTTP Upgrade (http -> websocket||"
162 <<
"mqtt" <<
") start " << (success ?
"success" :
"failed");
164 [connectionName](
const std::shared_ptr<web::http::client::Request>& req,
165 const std::shared_ptr<web::http::client::Response>& res,
167 logResponse(req, res);
169 VLOG(1) << connectionName <<
": HTTP Upgrade " << (success ?
"success" :
"failed");
171 [connectionName](
const std::shared_ptr<web::http::client::Request>& req,
const std::string& message) {
172 VLOG(1) << connectionName <<
": Response parse error: " << message;
173 VLOG(1) <<
" Request was: " << req->method <<
" " << req->url <<
" HTTP/" << req->httpMajor <<
"." << req->httpMinor
175 << httputils::toString(req->method,
177 "HTTP/" + std::to_string(req->httpMajor) +
"." + std::to_string(req->httpMinor),
186 []([[maybe_unused]]
const std::shared_ptr<web::http::client::Request>& req) {
187 VLOG(1) <<
"Session ended";
190 configurator(httpClient.getConfig());
192 httpClient.getConfig()->setRetry();
193 httpClient.getConfig()->setRetryBase(1);
194 httpClient.getConfig()->setReconnect();
195 httpClient.getConfig()->setDisabled();
197 httpClient.connect([name](
const SocketAddress& socketAddress,
const core::socket::State& state) {
198 reportState(name, socketAddress, state);
209 config->setRequireCallback([config]() {
210 if (!config->getDisabled() && config->getShowConfigTriggerApp() ==
nullptr &&
211 config->getParent()->getOption(
"--write-config")->count() == 0) {
215 if (pubApp
->getTopic().empty() && subApp->getTopic().empty()) {
216 throw CLI::RequiresError(config->getParent()->getName() +
":" + config->getInstanceName() +
217 " requires at least one of {sub | pub}",
218 CLI::ExitCodes::RequiresError);
222 VLOG(0) <<
"[" << Color::Code::FG_LIGHT_GREEN <<
"Success" << Color::Code::FG_DEFAULT <<
"] " <<
"Bootstrap of "
223 << config->getInstanceName() <<
":pub";
226 if (!subApp->getTopic().empty()) {
227 VLOG(0) <<
"[" << Color::Code::FG_LIGHT_GREEN <<
"Success" << Color::Code::FG_DEFAULT <<
"] " <<
"Bootstrap of "
228 << config->getInstanceName() <<
":sub";
242int main(
int argc,
char* argv[]) {
243 core::SNodeC::init(argc, argv);
245#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV4)
246 startClient<net::in::stream::legacy::SocketClient>(
248 [](net::in::stream::legacy::config::ConfigSocketClient* config) {
249 config->Remote::setPort(1883);
250 config->setDisableNagleAlgorithm();
252 createConfig(config);
256#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV4)
257 startClient<net::in::stream::tls::SocketClient>(
259 [](net::in::stream::tls::config::ConfigSocketClient* config) {
260 config->Remote::setPort(1883);
261 config->setDisableNagleAlgorithm();
263 createConfig(config);
267#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV6)
268 startClient<net::in6::stream::legacy::SocketClient>(
270 [](net::in6::stream::legacy::config::ConfigSocketClient* config) {
271 config->Remote::setPort(1883);
272 config->setDisableNagleAlgorithm();
274 createConfig(config);
278#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV6)
279 startClient<net::in6::stream::tls::SocketClient>(
281 [](net::in6::stream::tls::config::ConfigSocketClient* config) {
282 config->Remote::setPort(1883);
283 config->setDisableNagleAlgorithm();
285 createConfig(config);
289#if defined(CONFIG_MQTTSUITE_CLI_UNIX)
290 startClient<net::un::stream::legacy::SocketClient>(
292 [](net::un::stream::legacy::config::ConfigSocketClient* config) {
293 createConfig(config);
297#if defined(CONFIG_MQTTSUITE_CLI_UNIX_TLS)
298 startClient<net::un::stream::tls::SocketClient>(
300 [](net::un::stream::tls::config::ConfigSocketClient* config) {
301 createConfig(config);
305#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV4) && defined(CONFIG_MQTTSUITE_CLI_WS)
306 startClient<web::http::legacy::in::Client>(
308 [](net::in::stream::legacy::config::ConfigSocketClient* config) {
309 config->Remote::setPort(8080);
310 config->setDisableNagleAlgorithm();
312 createWSConfig(config);
316#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV4) && defined(CONFIG_MQTTSUITE_CLI_WSS)
317 startClient<web::http::tls::in::Client>(
319 [](net::in::stream::tls::config::ConfigSocketClient* config) {
320 config->Remote::setPort(8088);
321 config->setDisableNagleAlgorithm();
323 createWSConfig(config);
327#if defined(CONFIG_MQTTSUITE_CLI_TCP_IPV6) && defined(CONFIG_MQTTSUITE_CLI_WS)
328 startClient<web::http::legacy::in6::Client>(
330 [](net::in6::stream::legacy::config::ConfigSocketClient* config) {
331 config->Remote::setPort(8080);
332 config->setDisableNagleAlgorithm();
334 createWSConfig(config);
338#if defined(CONFIG_MQTTSUITE_CLI_TLS_IPV6) && defined(CONFIG_MQTTSUITE_CLI_WSS)
339 startClient<web::http::tls::in6::Client>(
341 [](net::in6::stream::tls::config::ConfigSocketClient* config) {
342 config->Remote::setPort(8088);
343 config->setDisableNagleAlgorithm();
345 createWSConfig(config);
349#if defined(CONFIG_MQTTSUITE_CLI_UNIX) && defined(CONFIG_MQTTSUITE_CLI_WS)
350 startClient<web::http::legacy::un::Client>(
352 [](net::un::stream::legacy::config::ConfigSocketClient* config) {
353 createWSConfig(config);
357#if defined(CONFIG_MQTTSUITE_CLI_UNIX_TLS) && defined(CONFIG_MQTTSUITE_CLI_WSS)
358 startClient<web::http::tls::un::Client>(
360 [](net::un::stream::tls::config::ConfigSocketClient* config) {
361 createWSConfig(config);
365 return core::SNodeC::start();