MQTTSuite
Loading...
Searching...
No Matches
mqttintegrator.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the Free
8 * Software Foundation, either version 3 of the License, or (at your option)
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "SocketContextFactory.h" // IWYU pragma: keep
43#include "config.h"
44
45#ifdef LINK_SUBPROTOCOL_STATIC
46
47#include "websocket/SubProtocolFactory.h"
48
49#include <web/websocket/client/SubProtocolFactorySelector.h>
50
51#endif
52
53#if defined(LINK_WEBSOCKET_STATIC) || defined(LINK_SUBPROTOCOL_STATIC)
54
55#include <web/websocket/client/SocketContextUpgradeFactory.h>
56
57#endif
58
59#ifndef DOXYGEN_SHOULD_SKIP_THIS
60
61#include <core/SNodeC.h>
62//
63#include <net/in/stream/legacy/SocketClient.h>
64#include <net/in/stream/tls/SocketClient.h>
65#include <net/in6/stream/legacy/SocketClient.h>
66#include <net/in6/stream/tls/SocketClient.h>
67#include <net/un/stream/legacy/SocketClient.h>
68#include <net/un/stream/tls/SocketClient.h>
69#include <web/http/legacy/in/Client.h>
70#include <web/http/legacy/in6/Client.h>
71#include <web/http/legacy/un/Client.h>
72#include <web/http/tls/in/Client.h>
73#include <web/http/tls/in6/Client.h>
74#include <web/http/tls/un/Client.h>
75//
76#include <log/Logger.h>
77#include <utils/Config.h>
78//
79#include <string>
80
81#endif
82
83static void
84reportState(const std::string& instanceName, const core::socket::SocketAddress& socketAddress, const core::socket::State& state) {
85 switch (state) {
86 case core::socket::State::OK:
87 VLOG(1) << instanceName << ": connected to '" << socketAddress.toString() << "'";
88 break;
89 case core::socket::State::DISABLED:
90 VLOG(1) << instanceName << ": disabled";
91 break;
92 case core::socket::State::ERROR:
93 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
94 break;
95 case core::socket::State::FATAL:
96 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
97 break;
98 }
99}
100
101template <typename HttpClient>
102void startClient(const std::string& name, const std::function<void(typename HttpClient::Config&)>& configurator = nullptr) {
103 using SocketAddress = typename HttpClient::SocketAddress;
104
105 const HttpClient httpClient(
106 name,
107 [](const std::shared_ptr<web::http::client::MasterRequest>& req) {
108 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
109
110 req->set("Sec-WebSocket-Protocol", "mqtt");
111
112 req->upgrade(
113 "/ws",
114 "websocket",
115 [connectionName](bool success) {
116 VLOG(1) << connectionName << ": HTTP Upgrade (http -> websocket||"
117 << "mqtt" << ") start " << (success ? "success" : "failed");
118 },
119 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req,
120 [[maybe_unused]] const std::shared_ptr<web::http::client::Response>& res,
121 [[maybe_unused]] bool success) {
122 },
123 [connectionName](const std::shared_ptr<web::http::client::Request>&, const std::string& message) {
124 VLOG(1) << connectionName << ": Request parse error: " << message;
125 });
126 },
127 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req) {
128 VLOG(1) << "Session ended";
129 });
130
131 if (configurator != nullptr) {
132 configurator(httpClient.getConfig());
133 }
134
135 httpClient.connect([name](const SocketAddress& socketAddress, const core::socket::State& state) {
136 reportState(name, socketAddress, state);
137 });
138}
139
140int main(int argc, char* argv[]) {
141#if defined(LINK_WEBSOCKET_STATIC) || defined(LINK_SUBPROTOCOL_STATIC)
142 web::websocket::client::SocketContextUpgradeFactory::link();
143#endif
144
145#ifdef LINK_SUBPROTOCOL_STATIC
146 web::websocket::client::SubProtocolFactorySelector::link("mqtt", mqttClientSubProtocolFactory);
147#endif
148
149 utils::Config::addStringOption("--mqtt-mapping-file", "MQTT mapping file (json format) for integration", "[path]");
150 utils::Config::addStringOption("--mqtt-session-store", "Path to file for the persistent session store", "[path]", "");
151
152 core::SNodeC::init(argc, argv);
153
154 std::string sessionStoreFileName = utils::Config::getStringOptionValue("--mqtt-session-store");
155
156#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4)
157 net::in::stream::legacy::Client<mqtt::mqttintegrator::SocketContextFactory>(
158 "in-mqtt",
159 [](auto& config) {
160 config.Remote::setPort(1883);
161
162 config.setRetry();
163 config.setRetryBase(1);
164 config.setReconnect();
165 config.setDisableNagleAlgorithm();
166 },
167 sessionStoreFileName)
168 .connect([](const auto& socketAddress, const core::socket::State& state) {
169 reportState("in-mqtt", socketAddress, state);
170 });
171#endif // CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4
172
173#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV4)
174 net::in::stream::tls::Client<mqtt::mqttintegrator::SocketContextFactory>(
175 "in-mqtts",
176 [](auto& config) {
177 config.Remote::setPort(1883);
178
179 config.setRetry();
180 config.setRetryBase(1);
181 config.setReconnect();
182 config.setDisableNagleAlgorithm();
183 },
184 sessionStoreFileName)
185 .connect([](const auto& socketAddress, const core::socket::State& state) {
186 reportState("in-mqtts", socketAddress, state);
187 });
188#endif
189
190#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV6)
191 net::in6::stream::legacy::Client<mqtt::mqttintegrator::SocketContextFactory>(
192 "in6-mqtt",
193 [](auto& config) {
194 config.Remote::setPort(1883);
195
196 config.setRetry();
197 config.setRetryBase(1);
198 config.setReconnect();
199 config.setDisableNagleAlgorithm();
200 },
201 sessionStoreFileName)
202 .connect([](const auto& socketAddress, const core::socket::State& state) {
203 reportState("in6-mqtt", socketAddress, state);
204 });
205#endif
206
207#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV6)
208 net::in6::stream::tls::Client<mqtt::mqttintegrator::SocketContextFactory>(
209 "in6-mqtts",
210 [](auto& config) {
211 config.Remote::setPort(1883);
212
213 config.setRetry();
214 config.setRetryBase(1);
215 config.setReconnect();
216 config.setDisableNagleAlgorithm();
217 },
218 sessionStoreFileName)
219 .connect([](const auto& socketAddress, const core::socket::State& state) {
220 reportState("in6-mqtts", socketAddress, state);
221 });
222#endif
223
224#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX)
225 net::un::stream::legacy::Client<mqtt::mqttintegrator::SocketContextFactory>(
226 "un-mqtt",
227 [](auto& config) {
228 config.Remote::setSunPath("/var/mqttbroker-un-mqtt");
229
230 config.setRetry();
231 config.setRetryBase(1);
232 config.setReconnect();
233 },
234 sessionStoreFileName)
235 .connect([](const auto& socketAddress, const core::socket::State& state) {
236 reportState("un-mqtt", socketAddress, state);
237 });
238#endif
239
240#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX_TLS)
241 net::un::stream::tls::Client<mqtt::mqttintegrator::SocketContextFactory>(
242 "un-mqtts",
243 [](auto& config) {
244 config.Remote::setSunPath("/var/mqttbroker-un-mqtt");
245
246 config.setRetry();
247 config.setRetryBase(1);
248 config.setReconnect();
249 },
250 sessionStoreFileName)
251 .connect([](const auto& socketAddress, const core::socket::State& state) {
252 reportState("un-mqtts", socketAddress, state);
253 });
254#endif
255
256#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
257 startClient<web::http::legacy::in::Client>("in-wsmqtt", [](auto& config) {
258 config.Remote::setPort(8080);
259
260 config.setRetry();
261 config.setRetryBase(1);
262 config.setReconnect();
263 config.setDisableNagleAlgorithm();
264 });
265#endif
266
267#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV4) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
268 startClient<web::http::tls::in::Client>("in-wsmqtts", [](auto& config) {
269 config.Remote::setPort(8088);
270
271 config.setRetry();
272 config.setRetryBase(1);
273 config.setReconnect();
274 config.setDisableNagleAlgorithm();
275 });
276#endif
277
278#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV6) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
279 startClient<web::http::legacy::in6::Client>("in6-wsmqtt", [](auto& config) {
280 config.Remote::setPort(8080);
281
282 config.setRetry();
283 config.setRetryBase(1);
284 config.setReconnect();
285 config.setDisableNagleAlgorithm();
286 });
287#endif
288
289#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV6) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
290 startClient<web::http::tls::in6::Client>("in6-wsmqtts", [](auto& config) {
291 config.Remote::setPort(8088);
292
293 config.setRetry();
294 config.setRetryBase(1);
295 config.setReconnect();
296 config.setDisableNagleAlgorithm();
297 });
298#endif
299
300#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
301 startClient<web::http::legacy::un::Client>("un-wsmqtt", [](auto& config) {
302 config.setRetry();
303 config.setRetryBase(1);
304 config.setReconnect();
305 });
306#endif
307
308#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX_TLS) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
309 startClient<web::http::tls::un::Client>("un-wsmqtts", [](auto& config) {
310 config.setRetry();
311 config.setRetryBase(1);
312 config.setReconnect();
313 });
314#endif
315
316 return core::SNodeC::start();
317}
int main(int argc, char *argv[])
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
void startClient(const std::string &name, const std::function< void(typename HttpClient::Config &)> &configurator=nullptr)