95reportState(
const std::string& instanceName,
const core::socket::SocketAddress& socketAddress,
const core::socket::State& state) {
97 case core::socket::State::OK:
98 VLOG(1) << instanceName <<
": connected to '" << socketAddress.toString() <<
"'";
100 case core::socket::State::DISABLED:
101 VLOG(1) << instanceName <<
": disabled";
103 case core::socket::State::ERROR:
104 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
106 case core::socket::State::FATAL:
107 VLOG(1) << instanceName <<
": " << socketAddress.toString() <<
": " << state.what();
115 const std::function<
void(
typename SocketClientT<mqtt::mqttintegrator::
SocketContextFactory>::Config*)>& configurator,
118 using SocketAddress =
typename Client::SocketAddress;
120 Client socketClient = core::socket::stream::Client<Client>(instanceName, configurator, std::forward<Args>(args)...);
122 socketClient.getConfig()->setRetry();
123 socketClient.getConfig()->setRetryBase(1);
124 socketClient.getConfig()->setReconnect();
126 socketClient.connect([instanceName](
const SocketAddress& socketAddress,
const core::socket::State& state) {
127 reportState(instanceName, socketAddress, state);
134HttpClient
startClient(
const std::string& name,
const std::function<
void(
typename HttpClient::Config*)>& configurator =
nullptr) {
135 using SocketAddress =
typename HttpClient::SocketAddress;
137 const HttpClient httpClient(
139 [](
const std::shared_ptr<web::http::client::MasterRequest>& req) {
140 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
142 req->set(
"Sec-WebSocket-Protocol",
"mqtt");
147 [connectionName](
bool success) {
148 VLOG(1) << connectionName <<
": HTTP Upgrade (http -> websocket||"
149 <<
"mqtt" <<
") start " << (success ?
"success" :
"failed");
151 []([[maybe_unused]]
const std::shared_ptr<web::http::client::Request>& req,
152 [[maybe_unused]]
const std::shared_ptr<web::http::client::Response>& res,
153 [[maybe_unused]]
bool success) {
155 [connectionName](
const std::shared_ptr<web::http::client::Request>&,
const std::string& message) {
156 VLOG(1) << connectionName <<
": Request parse error: " << message;
159 []([[maybe_unused]]
const std::shared_ptr<web::http::client::Request>& req) {
160 VLOG(1) <<
"Session ended";
163 if (configurator !=
nullptr) {
164 configurator(httpClient.getConfig());
167 httpClient.getConfig()->setRetry();
168 httpClient.getConfig()->setRetryBase(1);
169 httpClient.getConfig()->setReconnect();
171 httpClient.connect([name](
const SocketAddress& socketAddress,
const core::socket::State& state) {
172 reportState(name, socketAddress, state);
178int main(
int argc,
char* argv[]) {
181 core::SNodeC::init(argc, argv);
185 mqtt::lib::admin::makeMappingAdminRouter(configMqttIntegrator, mqtt::lib::admin::AdminOptions{}, [](
bool mustReconnect) {
189 express::legacy::in::Server(
"in-http", router,
reportState, [](net::in::stream::legacy::config::ConfigSocketServer* config) {
190 config->setPort(8085);
194 express::tls::in::Server(
"in-https", router,
reportState, [](net::in::stream::tls::config::ConfigSocketServer* config) {
195 config->setPort(8086);
199#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4)
200 startClient<net::in::stream::legacy::SocketClient>(
202 [](net::in::stream::legacy::config::ConfigSocketClient* config) {
203 config->Remote::setPort(1883);
205 config->setDisableNagleAlgorithm();
209#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV4)
210 startClient<net::in::stream::tls::SocketClient>(
212 [](net::in::stream::tls::config::ConfigSocketClient* config) {
213 config->Remote::setPort(1883);
214 config->setDisableNagleAlgorithm();
218#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV6)
219 startClient<net::in6::stream::legacy::SocketClient>(
221 [](net::in6::stream::legacy::config::ConfigSocketClient* config) {
222 config->Remote::setPort(1883);
223 config->setDisableNagleAlgorithm();
227#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV6)
228 startClient<net::in6::stream::tls::SocketClient>(
230 [](net::in6::stream::tls::config::ConfigSocketClient* config) {
231 config->Remote::setPort(1883);
232 config->setDisableNagleAlgorithm();
236#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX)
237 startClient<net::un::stream::legacy::SocketClient>(
239 []([[maybe_unused]]
const net::un::stream::legacy::config::ConfigSocketClient* config) {
243#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX_TLS)
244 startClient<net::un::stream::tls::SocketClient>(
246 []([[maybe_unused]]
const net::un::stream::tls::config::ConfigSocketClient* config) {
250#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
251 startClient<web::http::legacy::in::Client>(
253 [](net::in::stream::legacy::config::ConfigSocketClient* config) {
254 config->Remote::setPort(8080);
255 config->setDisableNagleAlgorithm();
259#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV4) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
260 startClient<web::http::tls::in::Client>(
262 [](net::in::stream::tls::config::ConfigSocketClient* config) {
263 config->Remote::setPort(8088);
264 config->setDisableNagleAlgorithm();
268#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV6) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
269 startClient<web::http::legacy::in6::Client>(
271 [](net::in6::stream::legacy::config::ConfigSocketClient* config) {
272 config->Remote::setPort(8080);
273 config->setDisableNagleAlgorithm();
277#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV6) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
278 startClient<web::http::tls::in6::Client>(
280 [](net::in6::stream::tls::config::ConfigSocketClient* config) {
281 config->Remote::setPort(8088);
282 config->setDisableNagleAlgorithm();
286#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
287 startClient<web::http::legacy::un::Client>(
289 []([[maybe_unused]]
const net::un::stream::legacy::config::ConfigSocketClient* config) {
293#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX_TLS) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
294 startClient<web::http::tls::un::Client>(
296 []([[maybe_unused]]
const net::un::stream::tls::config::ConfigSocketClient* config) {
300 return core::SNodeC::start();