MQTTSuite
Loading...
Searching...
No Matches
mqttintegrator.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025, 2026
5 * Tobias Pfeil
6 * 2025, 2026
7 *
8 * This program is free software: you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License as published by the Free
10 * Software Foundation, either version 3 of the License, or (at your option)
11 * any later version.
12 *
13 * This program is distributed in the hope that it will be useful, but WITHOUT
14 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
15 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
16 * more details.
17 *
18 * You should have received a copy of the GNU General Public License along
19 * with this program. If not, see <https://www.gnu.org/licenses/>.
20 */
21
22/*
23 * MIT License
24 *
25 * Permission is hereby granted, free of charge, to any person obtaining a copy
26 * of this software and associated documentation files (the "Software"), to deal
27 * in the Software without restriction, including without limitation the rights
28 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
29 * copies of the Software, and to permit persons to whom the Software is
30 * furnished to do so, subject to the following conditions:
31 *
32 * The above copyright notice and this permission notice shall be included in
33 * all copies or substantial portions of the Software.
34 *
35 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
36 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
37 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
38 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
39 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
40 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
41 * THE SOFTWARE.
42 */
43
44#include "SocketContextFactory.h" // IWYU pragma: keep
45#include "config.h"
46#include "lib/ConfigApplication.h"
47
48#ifdef LINK_SUBPROTOCOL_STATIC
49
50#include "websocket/SubProtocolFactory.h"
51
52#include <web/websocket/client/SubProtocolFactorySelector.h>
53
54#endif
55
56#if defined(LINK_WEBSOCKET_STATIC) || defined(LINK_SUBPROTOCOL_STATIC)
57
58#include <web/websocket/client/SocketContextUpgradeFactory.h>
59
60#endif
61
62#include <core/SNodeC.h>
63#include <utils/Config.h>
64//
65#include <net/in/stream/legacy/SocketClient.h>
66#include <net/in/stream/tls/SocketClient.h>
67#include <net/in6/stream/legacy/SocketClient.h>
68#include <net/in6/stream/tls/SocketClient.h>
69#include <net/un/stream/legacy/SocketClient.h>
70#include <net/un/stream/tls/SocketClient.h>
71#include <web/http/legacy/in/Client.h>
72#include <web/http/legacy/in6/Client.h>
73#include <web/http/legacy/un/Client.h>
74#include <web/http/tls/in/Client.h>
75#include <web/http/tls/in6/Client.h>
76#include <web/http/tls/un/Client.h>
77//
78#include <express/legacy/in/Server.h>
79#include <express/tls/in/Server.h>
80//
81
82#ifndef DOXYGEN_SHOULD_SKIP_THIS
83
84#include <log/Logger.h>
85//
86#include <utility>
87
88#endif
89
90// admin API
91#include "lib/MappingAdminRouter.h"
92#include "lib/Mqtt.h"
93
94static void
95reportState(const std::string& instanceName, const core::socket::SocketAddress& socketAddress, const core::socket::State& state) {
96 switch (state) {
97 case core::socket::State::OK:
98 VLOG(1) << instanceName << ": connected to '" << socketAddress.toString() << "'";
99 break;
100 case core::socket::State::DISABLED:
101 VLOG(1) << instanceName << ": disabled";
102 break;
103 case core::socket::State::ERROR:
104 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
105 break;
106 case core::socket::State::FATAL:
107 VLOG(1) << instanceName << ": " << socketAddress.toString() << ": " << state.what();
108 break;
109 }
110}
111
112template <template <typename SocketContextFactoryT, typename... ArgsT> typename SocketClientT, typename... Args>
113static SocketClientT<mqtt::mqttintegrator::SocketContextFactory, Args...>
114startClient(const std::string& instanceName,
115 const std::function<void(typename SocketClientT<mqtt::mqttintegrator::SocketContextFactory>::Config*)>& configurator,
116 Args&&... args) {
117 using Client = SocketClientT<mqtt::mqttintegrator::SocketContextFactory, Args...>;
118 using SocketAddress = typename Client::SocketAddress;
119
120 Client socketClient = core::socket::stream::Client<Client>(instanceName, configurator, std::forward<Args>(args)...);
121
122 socketClient.getConfig()->setRetry();
123 socketClient.getConfig()->setRetryBase(1);
124 socketClient.getConfig()->setReconnect();
125
126 socketClient.connect([instanceName](const SocketAddress& socketAddress, const core::socket::State& state) {
127 reportState(instanceName, socketAddress, state);
128 });
129
130 return socketClient;
131}
132
133template <typename HttpClient>
134HttpClient startClient(const std::string& name, const std::function<void(typename HttpClient::Config*)>& configurator = nullptr) {
135 using SocketAddress = typename HttpClient::SocketAddress;
136
137 const HttpClient httpClient(
138 name,
139 [](const std::shared_ptr<web::http::client::MasterRequest>& req) {
140 const std::string connectionName = req->getSocketContext()->getSocketConnection()->getConnectionName();
141
142 req->set("Sec-WebSocket-Protocol", "mqtt");
143
144 req->upgrade(
145 "/ws",
146 "websocket",
147 [connectionName](bool success) {
148 VLOG(1) << connectionName << ": HTTP Upgrade (http -> websocket||"
149 << "mqtt" << ") start " << (success ? "success" : "failed");
150 },
151 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req,
152 [[maybe_unused]] const std::shared_ptr<web::http::client::Response>& res,
153 [[maybe_unused]] bool success) {
154 },
155 [connectionName](const std::shared_ptr<web::http::client::Request>&, const std::string& message) {
156 VLOG(1) << connectionName << ": Request parse error: " << message;
157 });
158 },
159 []([[maybe_unused]] const std::shared_ptr<web::http::client::Request>& req) {
160 VLOG(1) << "Session ended";
161 });
162
163 if (configurator != nullptr) {
164 configurator(httpClient.getConfig());
165 }
166
167 httpClient.getConfig()->setRetry();
168 httpClient.getConfig()->setRetryBase(1);
169 httpClient.getConfig()->setReconnect();
170
171 httpClient.connect([name](const SocketAddress& socketAddress, const core::socket::State& state) {
172 reportState(name, socketAddress, state);
173 });
174
175 return httpClient;
176}
177
178int main(int argc, char* argv[]) {
179 mqtt::lib::ConfigMqttIntegrator* configMqttIntegrator = utils::Config::configRoot.newSubCommand<mqtt::lib::ConfigMqttIntegrator>();
180
181 core::SNodeC::init(argc, argv);
182
183 // Instanciate Admin Router for Mapping Management
184 express::Router router =
185 mqtt::lib::admin::makeMappingAdminRouter(configMqttIntegrator, mqtt::lib::admin::AdminOptions{}, [](bool mustReconnect) {
186 return mqtt::mqttintegrator::lib::Mqtt::updateSubscriptions(mustReconnect);
187 });
188
189 express::legacy::in::Server("in-http", router, reportState, [](net::in::stream::legacy::config::ConfigSocketServer* config) {
190 config->setPort(8085);
191 config->setRetry();
192 });
193
194 express::tls::in::Server("in-https", router, reportState, [](net::in::stream::tls::config::ConfigSocketServer* config) {
195 config->setPort(8086);
196 config->setRetry();
197 });
198
199#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4)
200 startClient<net::in::stream::legacy::SocketClient>( //
201 "in-mqtt",
202 [](net::in::stream::legacy::config::ConfigSocketClient* config) {
203 config->Remote::setPort(1883);
204
205 config->setDisableNagleAlgorithm();
206 });
207#endif // CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4
208
209#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV4)
210 startClient<net::in::stream::tls::SocketClient>( //
211 "in-mqtts",
212 [](net::in::stream::tls::config::ConfigSocketClient* config) {
213 config->Remote::setPort(1883);
214 config->setDisableNagleAlgorithm();
215 });
216#endif
217
218#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV6)
219 startClient<net::in6::stream::legacy::SocketClient>( //
220 "in6-mqtt",
221 [](net::in6::stream::legacy::config::ConfigSocketClient* config) {
222 config->Remote::setPort(1883);
223 config->setDisableNagleAlgorithm();
224 });
225#endif
226
227#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV6)
228 startClient<net::in6::stream::tls::SocketClient>( //
229 "in6-mqtts",
230 [](net::in6::stream::tls::config::ConfigSocketClient* config) {
231 config->Remote::setPort(1883);
232 config->setDisableNagleAlgorithm();
233 });
234#endif
235
236#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX)
237 startClient<net::un::stream::legacy::SocketClient>( //
238 "un-mqtt",
239 []([[maybe_unused]] const net::un::stream::legacy::config::ConfigSocketClient* config) {
240 });
241#endif
242
243#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX_TLS)
244 startClient<net::un::stream::tls::SocketClient>( //
245 "un-mqtts",
246 []([[maybe_unused]] const net::un::stream::tls::config::ConfigSocketClient* config) {
247 });
248#endif
249
250#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV4) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
251 startClient<web::http::legacy::in::Client>( //
252 "in-wsmqtt",
253 [](net::in::stream::legacy::config::ConfigSocketClient* config) {
254 config->Remote::setPort(8080);
255 config->setDisableNagleAlgorithm();
256 });
257#endif
258
259#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV4) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
260 startClient<web::http::tls::in::Client>( //
261 "in-wsmqtts",
262 [](net::in::stream::tls::config::ConfigSocketClient* config) {
263 config->Remote::setPort(8088);
264 config->setDisableNagleAlgorithm();
265 });
266#endif
267
268#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TCP_IPV6) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
269 startClient<web::http::legacy::in6::Client>( //
270 "in6-wsmqtt",
271 [](net::in6::stream::legacy::config::ConfigSocketClient* config) {
272 config->Remote::setPort(8080);
273 config->setDisableNagleAlgorithm();
274 });
275#endif
276
277#if defined(CONFIG_MQTTSUITE_INTEGRATOR_TLS_IPV6) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
278 startClient<web::http::tls::in6::Client>( //
279 "in6-wsmqtts",
280 [](net::in6::stream::tls::config::ConfigSocketClient* config) {
281 config->Remote::setPort(8088);
282 config->setDisableNagleAlgorithm();
283 });
284#endif
285
286#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WS)
287 startClient<web::http::legacy::un::Client>( //
288 "un-wsmqtt",
289 []([[maybe_unused]] const net::un::stream::legacy::config::ConfigSocketClient* config) {
290 });
291#endif
292
293#if defined(CONFIG_MQTTSUITE_INTEGRATOR_UNIX_TLS) && defined(CONFIG_MQTTSUITE_INTEGRATOR_WSS)
294 startClient<web::http::tls::un::Client>( //
295 "un-wsmqtts",
296 []([[maybe_unused]] const net::un::stream::tls::config::ConfigSocketClient* config) {
297 });
298#endif
299
300 return core::SNodeC::start();
301}
static mqtt::lib::admin::ReloadResult updateSubscriptions(bool mustReconnect)
Definition Mqtt.cpp:89
int main(int argc, char *argv[])
static SocketClientT< mqtt::mqttintegrator::SocketContextFactory, Args... > startClient(const std::string &instanceName, const std::function< void(typename SocketClientT< mqtt::mqttintegrator::SocketContextFactory >::Config *)> &configurator, Args &&... args)
static void reportState(const std::string &instanceName, const core::socket::SocketAddress &socketAddress, const core::socket::State &state)
HttpClient startClient(const std::string &name, const std::function< void(typename HttpClient::Config *)> &configurator=nullptr)