84reportState(
const std::string& instanceName,
const core::socket::SocketAddress& socketAddress,
const core::socket::State& state) {
86 case core::socket::State::OK:
87 VLOG(1) << instanceName <<
": connected to '" << socketAddress.toString() <<
"'";
89 case core::socket::State::DISABLED:
90 VLOG(1) << instanceName <<
": disabled";
92 case core::socket::State::ERROR:
93 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
95 case core::socket::State::FATAL:
96 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
102void startClient(
const std::string& name,
const std::function<
void(
typename HttpClient::Config&)>& configurator =
nullptr) {
103 using SocketAddress =
typename HttpClient::SocketAddress;
105 const HttpClient httpClient(
107 [](
const std::shared_ptr<web::http::client::MasterRequest>& req) {
108 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
110 req->set(
"Sec-WebSocket-Protocol",
"mqtt");
115 [connectionName](
bool success) {
116 VLOG(1) << connectionName <<
": HTTP Upgrade (http -> websocket||"
117 <<
"mqtt" <<
") start " << (success ?
"success" :
"failed");
119 []([[maybe_unused]]
const std::shared_ptr<web::http::client::Request>& req,
120 [[maybe_unused]]
const std::shared_ptr<web::http::client::Response>& res,
121 [[maybe_unused]]
bool success) {
123 [connectionName](
const std::shared_ptr<web::http::client::Request>&,
const std::string& message) {
124 VLOG(1) << connectionName <<
": Request parse error: " << message;
127 []([[maybe_unused]]
const std::shared_ptr<web::http::client::Request>& req) {
128 VLOG(1) <<
"Session ended";
131 if (configurator !=
nullptr) {
132 configurator(httpClient.getConfig());
135 httpClient.connect([name](
const SocketAddress& socketAddress,
const core::socket::State& state) {
136 reportState(name, socketAddress, state);
140int main(
int argc,
char* argv[]) {
141#if defined(LINK_WEBSOCKET_STATIC) || defined(LINK_SUBPROTOCOL_STATIC)
142 web::websocket::client::SocketContextUpgradeFactory::link();
145#ifdef LINK_SUBPROTOCOL_STATIC
146 web::websocket::client::SubProtocolFactorySelector::link(
"mqtt", mqttClientSubProtocolFactory);
149 utils::Config::addStringOption(
"--mqtt-mapping-file",
"MQTT mapping file (json format) for integration",
"[path]");
150 utils::Config::addStringOption(
"--mqtt-session-store",
"Path to file for the persistent session store",
"[path]",
"");
152 core::SNodeC::init(argc, argv);
154 std::string sessionStoreFileName = utils::Config::getStringOptionValue(
"--mqtt-session-store");
156#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4)
157 net::in::stream::legacy::Client<mqtt::mqttintegrator::SocketContextFactory>(
160 config.Remote::setPort(1883);
163 config.setRetryBase(1);
164 config.setReconnect();
165 config.setDisableNagleAlgorithm();
167 sessionStoreFileName)
168 .connect([](
const auto& socketAddress,
const core::socket::State& state) {
169 reportState(
"in-mqtt", socketAddress, state);
173#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV4)
174 net::in::stream::tls::Client<mqtt::mqttintegrator::SocketContextFactory>(
177 config.Remote::setPort(1883);
180 config.setRetryBase(1);
181 config.setReconnect();
182 config.setDisableNagleAlgorithm();
184 sessionStoreFileName)
185 .connect([](
const auto& socketAddress,
const core::socket::State& state) {
186 reportState(
"in-mqtts", socketAddress, state);
190#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV6)
191 net::in6::stream::legacy::Client<mqtt::mqttintegrator::SocketContextFactory>(
194 config.Remote::setPort(1883);
197 config.setRetryBase(1);
198 config.setReconnect();
199 config.setDisableNagleAlgorithm();
201 sessionStoreFileName)
202 .connect([](
const auto& socketAddress,
const core::socket::State& state) {
203 reportState(
"in6-mqtt", socketAddress, state);
207#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV6)
208 net::in6::stream::tls::Client<mqtt::mqttintegrator::SocketContextFactory>(
211 config.Remote::setPort(1883);
214 config.setRetryBase(1);
215 config.setReconnect();
216 config.setDisableNagleAlgorithm();
218 sessionStoreFileName)
219 .connect([](
const auto& socketAddress,
const core::socket::State& state) {
220 reportState(
"in6-mqtts", socketAddress, state);
224#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX)
225 net::un::stream::legacy::Client<mqtt::mqttintegrator::SocketContextFactory>(
228 config.Remote::setSunPath(
"/var/mqttbroker-un-mqtt");
231 config.setRetryBase(1);
232 config.setReconnect();
234 sessionStoreFileName)
235 .connect([](
const auto& socketAddress,
const core::socket::State& state) {
236 reportState(
"un-mqtt", socketAddress, state);
240#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX_TLS)
241 net::un::stream::tls::Client<mqtt::mqttintegrator::SocketContextFactory>(
244 config.Remote::setSunPath(
"/var/mqttbroker-un-mqtt");
247 config.setRetryBase(1);
248 config.setReconnect();
250 sessionStoreFileName)
251 .connect([](
const auto& socketAddress,
const core::socket::State& state) {
252 reportState(
"un-mqtts", socketAddress, state);
256#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
257 startClient<web::http::legacy::in::Client>(
"in-wsmqtt", [](
auto& config) {
258 config.Remote::setPort(8080);
261 config.setRetryBase(1);
262 config.setReconnect();
263 config.setDisableNagleAlgorithm();
267#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV4) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
268 startClient<web::http::tls::in::Client>(
"in-wsmqtts", [](
auto& config) {
269 config.Remote::setPort(8088);
272 config.setRetryBase(1);
273 config.setReconnect();
274 config.setDisableNagleAlgorithm();
278#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV6) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
279 startClient<web::http::legacy::in6::Client>(
"in6-wsmqtt", [](
auto& config) {
280 config.Remote::setPort(8080);
283 config.setRetryBase(1);
284 config.setReconnect();
285 config.setDisableNagleAlgorithm();
289#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV6) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
290 startClient<web::http::tls::in6::Client>(
"in6-wsmqtts", [](
auto& config) {
291 config.Remote::setPort(8088);
294 config.setRetryBase(1);
295 config.setReconnect();
296 config.setDisableNagleAlgorithm();
300#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
301 startClient<web::http::legacy::un::Client>(
"un-wsmqtt", [](
auto& config) {
303 config.setRetryBase(1);
304 config.setReconnect();
308#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX_TLS) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
309 startClient<web::http::tls::un::Client>(
"un-wsmqtts", [](
auto& config) {
311 config.setRetryBase(1);
312 config.setReconnect();
316 return core::SNodeC::start();