MQTTSuite
Loading...
Searching...
No Matches
mqttintegrator.cpp File Reference
#include "SocketContextFactory.h"
#include "config.h"
#include "lib/ConfigApplication.h"
#include <core/SNodeC.h>
#include <utils/Config.h>
#include <net/in/stream/legacy/SocketClient.h>
#include <net/in/stream/tls/SocketClient.h>
#include <net/in6/stream/legacy/SocketClient.h>
#include <net/in6/stream/tls/SocketClient.h>
#include <net/un/stream/legacy/SocketClient.h>
#include <net/un/stream/tls/SocketClient.h>
#include <web/http/legacy/in/Client.h>
#include <web/http/legacy/in6/Client.h>
#include <web/http/legacy/un/Client.h>
#include <web/http/tls/in/Client.h>
#include <web/http/tls/in6/Client.h>
#include <web/http/tls/un/Client.h>
#include <express/legacy/in/Server.h>
#include <express/tls/in/Server.h>
#include "lib/MappingAdminRouter.h"
#include "lib/Mqtt.h"
Include dependency graph for mqttintegrator.cpp:

Go to the source code of this file.

Functions

static void reportState (const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
template<template< typename SocketContextFactoryT, typename... ArgsT > typename SocketClientT, typename... Args>
static SocketClientT< mqtt::mqttintegrator::SocketContextFactory, Args... > startClient (const std::string &instanceName, const std::function< void(typename SocketClientT< mqtt::mqttintegrator::SocketContextFactory >::Config *)> &configurator, Args &&... args)
template<typename HttpClient>
HttpClient startClient (const std::string &name, const std::function< void(typename HttpClient::Config *)> &configurator=nullptr)
int main (int argc, char *argv[])

Function Documentation

◆ main()

int main ( int argc,
char * argv[] )

Definition at line 178 of file mqttintegrator.cpp.

178 {
179 mqtt::lib::ConfigMqttIntegrator* configMqttIntegrator = utils::Config::configRoot.newSubCommand<mqtt::lib::ConfigMqttIntegrator>();
180
181 core::SNodeC::init(argc, argv);
182
183 // Instanciate Admin Router for Mapping Management
184 express::Router router =
185 mqtt::lib::admin::makeMappingAdminRouter(configMqttIntegrator, mqtt::lib::admin::AdminOptions{}, [](bool mustReconnect) {
187 });
188
189 express::legacy::in::Server("in-http", router, reportState, [](net::in::stream::legacy::config::ConfigSocketServer* config) {
190 config->setPort(8085);
191 config->setRetry();
192 });
193
194 express::tls::in::Server("in-https", router, reportState, [](net::in::stream::tls::config::ConfigSocketServer* config) {
195 config->setPort(8086);
196 config->setRetry();
197 });
198
199#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4)
201 "in-mqtt",
202 [](net::in::stream::legacy::config::ConfigSocketClient* config) {
203 config->Remote::setPort(1883);
204
205 config->setDisableNagleAlgorithm();
206 });
207#endif // CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4
208
209#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV4)
211 "in-mqtts",
212 [](net::in::stream::tls::config::ConfigSocketClient* config) {
213 config->Remote::setPort(1883);
214 config->setDisableNagleAlgorithm();
215 });
216#endif
217
218#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV6)
220 "in6-mqtt",
221 [](net::in6::stream::legacy::config::ConfigSocketClient* config) {
222 config->Remote::setPort(1883);
223 config->setDisableNagleAlgorithm();
224 });
225#endif
226
227#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV6)
229 "in6-mqtts",
230 [](net::in6::stream::tls::config::ConfigSocketClient* config) {
231 config->Remote::setPort(1883);
232 config->setDisableNagleAlgorithm();
233 });
234#endif
235
236#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX)
238 "un-mqtt",
239 []([[maybe_unused]] const net::un::stream::legacy::config::ConfigSocketClient* config) {
240 });
241#endif
242
243#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX_TLS)
245 "un-mqtts",
246 []([[maybe_unused]] const net::un::stream::tls::config::ConfigSocketClient* config) {
247 });
248#endif
249
250#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
252 "in-wsmqtt",
253 [](net::in::stream::legacy::config::ConfigSocketClient* config) {
254 config->Remote::setPort(8080);
255 config->setDisableNagleAlgorithm();
256 });
257#endif
258
259#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV4) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
261 "in-wsmqtts",
262 [](net::in::stream::tls::config::ConfigSocketClient* config) {
263 config->Remote::setPort(8088);
264 config->setDisableNagleAlgorithm();
265 });
266#endif
267
268#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV6) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
270 "in6-wsmqtt",
271 [](net::in6::stream::legacy::config::ConfigSocketClient* config) {
272 config->Remote::setPort(8080);
273 config->setDisableNagleAlgorithm();
274 });
275#endif
276
277#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV6) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
279 "in6-wsmqtts",
280 [](net::in6::stream::tls::config::ConfigSocketClient* config) {
281 config->Remote::setPort(8088);
282 config->setDisableNagleAlgorithm();
283 });
284#endif
285
286#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
288 "un-wsmqtt",
289 []([[maybe_unused]] const net::un::stream::legacy::config::ConfigSocketClient* config) {
290 });
291#endif
292
293#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX_TLS) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
295 "un-wsmqtts",
296 []([[maybe_unused]] const net::un::stream::tls::config::ConfigSocketClient* config) {
297 });
298#endif
299
300 return core::SNodeC::start();
301}
static mqtt::lib::admin::ReloadResult updateSubscriptions(bool mustReconnect)
Definition Mqtt.cpp:89
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
static SocketClientT< mqtt::mqttintegrator::SocketContextFactory, Args... > startClient(const std::string &instanceName, const std::function< void(typename SocketClientT< mqtt::mqttintegrator::SocketContextFactory >::Config *)> &configurator, Args &&... args)
express::Router makeMappingAdminRouter(ConfigApplication *configApplication, const AdminOptions &opt, ReloadCallback onDeploy)

References reportState(), and mqtt::mqttintegrator::lib::Mqtt::updateSubscriptions().

Here is the call graph for this function:

◆ reportState()

void reportState ( const std::string & instanceName,
const core::socket::SocketAddress & socketAddress,
const core::socket::State & state )
static

Definition at line 95 of file mqttintegrator.cpp.

95 {
96 switch (state) {
97 case core::socket::State::OK:
98 VLOG(1) << instanceName << ": connected to '" << socketAddress.toString() << "'";
99 break;
100 case core::socket::State::DISABLED:
101 VLOG(1) << instanceName << ": disabled";
102 break;
103 case core::socket::State::ERROR:
104 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
105 break;
106 case core::socket::State::FATAL:
107 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
108 break;
109 }
110}

Referenced by main().

Here is the caller graph for this function:

◆ startClient() [1/2]

template<template< typename SocketContextFactoryT, typename... ArgsT > typename SocketClientT, typename... Args>
SocketClientT< mqtt::mqttintegrator::SocketContextFactory, Args... > startClient ( const std::string & instanceName,
const std::function< void(typename SocketClientT< mqtt::mqttintegrator::SocketContextFactory >::Config *)> & configurator,
Args &&... args )
static

Definition at line 114 of file mqttintegrator.cpp.

116 {
117 using Client = SocketClientT<mqtt::mqttintegrator::SocketContextFactory, Args...>;
118 using SocketAddress = typename Client::SocketAddress;
119
120 Client socketClient = core::socket::stream::Client<Client>(instanceName, configurator, std::forward<Args>(args)...);
121
122 socketClient.getConfig()->setRetry();
123 socketClient.getConfig()->setRetryBase(1);
124 socketClient.getConfig()->setReconnect();
125
126 socketClient.connect([instanceName](const SocketAddress& socketAddress, const core::socket::State& state) {
127 reportState(instanceName, socketAddress, state);
128 });
129
130 return socketClient;
131}
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)

◆ startClient() [2/2]

template<typename HttpClient>
HttpClient startClient ( const std::string & name,
const std::function< void(typename HttpClient::Config *)> & configurator = nullptr )

Definition at line 134 of file mqttintegrator.cpp.

134 {
135 using SocketAddress = typename HttpClient::SocketAddress;
136
137 const HttpClient httpClient(
138 name,
139 [](const std::shared_ptr<web::http::client::MasterRequest>& req) {
140 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
141
142 req->set("Sec-WebSocket-Protocol", "mqtt");
143
144 req->upgrade(
145 "/ws",
146 "websocket",
147 [connectionName](bool success) {
148 VLOG(1) << connectionName << ": HTTP Upgrade (http -> websocket||"
149 << "mqtt" << ") start " << (success ? "success" : "failed");
150 },
151 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req,
152 [[maybe_unused]] const std::shared_ptr<web::http::client::Response>& res,
153 [[maybe_unused]] bool success) {
154 },
155 [connectionName](const std::shared_ptr<web::http::client::Request>&, const std::string& message) {
156 VLOG(1) << connectionName << ": Request parse error: " << message;
157 });
158 },
159 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req) {
160 VLOG(1) << "Session ended";
161 });
162
163 if (configurator != nullptr) {
164 configurator(httpClient.getConfig());
165 }
166
167 httpClient.getConfig()->setRetry();
168 httpClient.getConfig()->setRetryBase(1);
169 httpClient.getConfig()->setReconnect();
170
171 httpClient.connect([name](const SocketAddress& socketAddress, const core::socket::State& state) {
172 reportState(name, socketAddress, state);
173 });
174
175 return httpClient;
176}