MQTTSuite
Loading...
Searching...
No Matches
mqttintegrator.cpp File Reference
#include "SocketContextFactory.h"
#include "config.h"
Include dependency graph for mqttintegrator.cpp:

Go to the source code of this file.

Functions

static void reportState (const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
template<typename HttpClient>
void startClient (const std::string &name, const std::function< void(typename HttpClient::Config &)> &configurator=nullptr)
int main (int argc, char *argv[])

Function Documentation

◆ main()

int main ( int argc,
char * argv[] )

Definition at line 140 of file mqttintegrator.cpp.

140 {
141#if defined(LINK_WEBSOCKET_STATIC) || defined(LINK_SUBPROTOCOL_STATIC)
142 web::websocket::client::SocketContextUpgradeFactory::link();
143#endif
144
145#ifdef LINK_SUBPROTOCOL_STATIC
146 web::websocket::client::SubProtocolFactorySelector::link("mqtt", mqttClientSubProtocolFactory);
147#endif
148
149 utils::Config::addStringOption("--mqtt-mapping-file", "MQTT mapping file (json format) for integration", "[path]");
150 utils::Config::addStringOption("--mqtt-session-store", "Path to file for the persistent session store", "[path]", "");
151
152 core::SNodeC::init(argc, argv);
153
154 std::string sessionStoreFileName = utils::Config::getStringOptionValue("--mqtt-session-store");
155
156#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4)
157 net::in::stream::legacy::Client<mqtt::mqttintegrator::SocketContextFactory>(
158 "in-mqtt",
159 [](auto& config) {
160 config.Remote::setPort(1883);
161
162 config.setRetry();
163 config.setRetryBase(1);
164 config.setReconnect();
165 config.setDisableNagleAlgorithm();
166 },
167 sessionStoreFileName)
168 .connect([](const auto& socketAddress, const core::socket::State& state) {
169 reportState("in-mqtt", socketAddress, state);
170 });
171#endif // CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4
172
173#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV4)
174 net::in::stream::tls::Client<mqtt::mqttintegrator::SocketContextFactory>(
175 "in-mqtts",
176 [](auto& config) {
177 config.Remote::setPort(1883);
178
179 config.setRetry();
180 config.setRetryBase(1);
181 config.setReconnect();
182 config.setDisableNagleAlgorithm();
183 },
184 sessionStoreFileName)
185 .connect([](const auto& socketAddress, const core::socket::State& state) {
186 reportState("in-mqtts", socketAddress, state);
187 });
188#endif
189
190#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV6)
191 net::in6::stream::legacy::Client<mqtt::mqttintegrator::SocketContextFactory>(
192 "in6-mqtt",
193 [](auto& config) {
194 config.Remote::setPort(1883);
195
196 config.setRetry();
197 config.setRetryBase(1);
198 config.setReconnect();
199 config.setDisableNagleAlgorithm();
200 },
201 sessionStoreFileName)
202 .connect([](const auto& socketAddress, const core::socket::State& state) {
203 reportState("in6-mqtt", socketAddress, state);
204 });
205#endif
206
207#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV6)
208 net::in6::stream::tls::Client<mqtt::mqttintegrator::SocketContextFactory>(
209 "in6-mqtts",
210 [](auto& config) {
211 config.Remote::setPort(1883);
212
213 config.setRetry();
214 config.setRetryBase(1);
215 config.setReconnect();
216 config.setDisableNagleAlgorithm();
217 },
218 sessionStoreFileName)
219 .connect([](const auto& socketAddress, const core::socket::State& state) {
220 reportState("in6-mqtts", socketAddress, state);
221 });
222#endif
223
224#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX)
225 net::un::stream::legacy::Client<mqtt::mqttintegrator::SocketContextFactory>(
226 "un-mqtt",
227 [](auto& config) {
228 config.Remote::setSunPath("/var/mqttbroker-un-mqtt");
229
230 config.setRetry();
231 config.setRetryBase(1);
232 config.setReconnect();
233 },
234 sessionStoreFileName)
235 .connect([](const auto& socketAddress, const core::socket::State& state) {
236 reportState("un-mqtt", socketAddress, state);
237 });
238#endif
239
240#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX_TLS)
241 net::un::stream::tls::Client<mqtt::mqttintegrator::SocketContextFactory>(
242 "un-mqtts",
243 [](auto& config) {
244 config.Remote::setSunPath("/var/mqttbroker-un-mqtt");
245
246 config.setRetry();
247 config.setRetryBase(1);
248 config.setReconnect();
249 },
250 sessionStoreFileName)
251 .connect([](const auto& socketAddress, const core::socket::State& state) {
252 reportState("un-mqtts", socketAddress, state);
253 });
254#endif
255
256#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
257 startClient<web::http::legacy::in::Client>("in-wsmqtt", [](auto& config) {
258 config.Remote::setPort(8080);
259
260 config.setRetry();
261 config.setRetryBase(1);
262 config.setReconnect();
263 config.setDisableNagleAlgorithm();
264 });
265#endif
266
267#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV4) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
268 startClient<web::http::tls::in::Client>("in-wsmqtts", [](auto& config) {
269 config.Remote::setPort(8088);
270
271 config.setRetry();
272 config.setRetryBase(1);
273 config.setReconnect();
274 config.setDisableNagleAlgorithm();
275 });
276#endif
277
278#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV6) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
279 startClient<web::http::legacy::in6::Client>("in6-wsmqtt", [](auto& config) {
280 config.Remote::setPort(8080);
281
282 config.setRetry();
283 config.setRetryBase(1);
284 config.setReconnect();
285 config.setDisableNagleAlgorithm();
286 });
287#endif
288
289#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV6) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
290 startClient<web::http::tls::in6::Client>("in6-wsmqtts", [](auto& config) {
291 config.Remote::setPort(8088);
292
293 config.setRetry();
294 config.setRetryBase(1);
295 config.setReconnect();
296 config.setDisableNagleAlgorithm();
297 });
298#endif
299
300#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
301 startClient<web::http::legacy::un::Client>("un-wsmqtt", [](auto& config) {
302 config.setRetry();
303 config.setRetryBase(1);
304 config.setReconnect();
305 });
306#endif
307
308#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX_TLS) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
309 startClient<web::http::tls::un::Client>("un-wsmqtts", [](auto& config) {
310 config.setRetry();
311 config.setRetryBase(1);
312 config.setReconnect();
313 });
314#endif
315
316 return core::SNodeC::start();
317}
mqtt::mqttbridge::websocket::SubProtocolFactory * mqttClientSubProtocolFactory()
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
void startClient(const std::string &name, const std::function< void(typename HttpClient::Config &)> &configurator=nullptr)

◆ reportState()

void reportState ( const std::string & instanceName,
const core::socket::SocketAddress & socketAddress,
const core::socket::State & state )
static

Definition at line 84 of file mqttintegrator.cpp.

84 {
85 switch (state) {
86 case core::socket::State::OK:
87 VLOG(1) << instanceName << ": connected to '" << socketAddress.toString() << "'";
88 break;
89 case core::socket::State::DISABLED:
90 VLOG(1) << instanceName << ": disabled";
91 break;
92 case core::socket::State::ERROR:
93 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
94 break;
95 case core::socket::State::FATAL:
96 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
97 break;
98 }
99}

◆ startClient()

template<typename HttpClient>
void startClient ( const std::string & name,
const std::function< void(typename HttpClient::Config &)> & configurator = nullptr )

Definition at line 102 of file mqttintegrator.cpp.

102 {
103 using SocketAddress = typename HttpClient::SocketAddress;
104
105 const HttpClient httpClient(
106 name,
107 [](const std::shared_ptr<web::http::client::MasterRequest>& req) {
108 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
109
110 req->set("Sec-WebSocket-Protocol", "mqtt");
111
112 req->upgrade(
113 "/ws",
114 "websocket",
115 [connectionName](bool success) {
116 VLOG(1) << connectionName << ": HTTP Upgrade (http -> websocket||"
117 << "mqtt" << ") start " << (success ? "success" : "failed");
118 },
119 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req,
120 [[maybe_unused]] const std::shared_ptr<web::http::client::Response>& res,
121 [[maybe_unused]] bool success) {
122 },
123 [connectionName](const std::shared_ptr<web::http::client::Request>&, const std::string& message) {
124 VLOG(1) << connectionName << ": Request parse error: " << message;
125 });
126 },
127 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req) {
128 VLOG(1) << "Session ended";
129 });
130
131 if (configurator != nullptr) {
132 configurator(httpClient.getConfig());
133 }
134
135 httpClient.connect([name](const SocketAddress& socketAddress, const core::socket::State& state) {
136 reportState(name, socketAddress, state);
137 });
138}