MQTTSuite
Loading...
Searching...
No Matches
MqttMapper.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the Free
8 * Software Foundation, either version 3 of the License, or (at your option)
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "MqttMapper.h"
43
45
46#include <cmath>
47#include <core/DynamicLoader.h>
48#include <iot/mqtt/Topic.h>
49#include <iot/mqtt/packets/Publish.h>
50
51#ifndef DOXYGEN_SHOULD_SKIP_THIS
52
53#ifdef __GNUC__
54#pragma GCC diagnostic push
55#ifdef __has_warning
56#if __has_warning("-Wcovered-switch-default")
57#pragma GCC diagnostic ignored "-Wcovered-switch-default"
58#endif
59#if __has_warning("-Wnrvo")
60#pragma GCC diagnostic ignored "-Wnrvo"
61#endif
62#endif
63#endif
64#include "inja.hpp"
65#ifdef __GNUC_
66#pragma GCC diagnostic pop
67#endif
68
69#include <algorithm>
70#include <dlfcn.h>
71#include <log/Logger.h>
72#include <map>
73#include <nlohmann/json.hpp>
74#include <vector>
75
76#endif
77
78// IWYU pragma: no_include <nlohmann/detail/iterators/iter_impl.hpp>
79
80namespace mqtt::lib {
81
82 MqttMapper::MqttMapper(const nlohmann::json& mappingJson)
83 : mappingJson(mappingJson) {
85
86 if (mappingJson.contains("plugins")) {
87 VLOG(1) << "Loading plugins ...";
88
89 for (const nlohmann::json& pluginJson : mappingJson["plugins"]) {
90 const std::string plugin = pluginJson;
91
92 void* handle = core::DynamicLoader::dlOpen(plugin);
93
94 if (handle != nullptr) {
95 pluginHandles.push_back(handle);
96
97 VLOG(1) << " Loading plugin: " << plugin << " ...";
98
99 const std::vector<mqtt::lib::Function>* loadedFunctions =
100 static_cast<std::vector<mqtt::lib::Function>*>(dlsym(handle, "functions"));
101 if (loadedFunctions != nullptr) {
102 VLOG(1) << " Registering inja 'none void callbacks'";
103 for (const mqtt::lib::Function& function : *loadedFunctions) {
104 VLOG(1) << " " << function.name;
105
106 if (function.numArgs >= 0) {
107 injaEnvironment->add_callback(function.name, function.numArgs, function.function);
108 } else {
109 injaEnvironment->add_callback(function.name, function.function);
110 }
111 }
112 VLOG(1) << " Registering inja 'none void callbacks done'";
113 } else {
114 VLOG(1) << " No inja none 'void callbacks found' in plugin " << plugin;
115 }
116
117 const std::vector<mqtt::lib::VoidFunction>* loadedVoidFunctions =
118 static_cast<std::vector<mqtt::lib::VoidFunction>*>(dlsym(handle, "voidFunctions"));
119 if (loadedVoidFunctions != nullptr) {
120 VLOG(1) << " Registering inja 'void callbacks'";
121 for (const mqtt::lib::VoidFunction& voidFunction : *loadedVoidFunctions) {
122 VLOG(1) << " " << voidFunction.name;
123
124 if (voidFunction.numArgs >= 0) {
125 injaEnvironment->add_void_callback(voidFunction.name, voidFunction.numArgs, voidFunction.function);
126 } else {
127 injaEnvironment->add_void_callback(voidFunction.name, voidFunction.function);
128 }
129 }
130 VLOG(1) << " Registering inja 'void callbacks' done";
131 } else {
132 VLOG(1) << " No inja 'void callbacks' found in plugin " << plugin;
133 }
134
135 VLOG(1) << " Loading plugin done: " << plugin;
136 } else {
137 VLOG(1) << " Error loading plugin: " << plugin;
138 }
139 }
140
141 VLOG(1) << "Loading plugins done";
142 }
143 }
144
146 delete injaEnvironment;
147
148 for (void* pluginHandle : pluginHandles) {
149 core::DynamicLoader::dlClose(pluginHandle);
150 }
151 }
152
153 std::string MqttMapper::dump() {
154 return mappingJson.dump();
155 }
156
157 std::list<iot::mqtt::Topic> MqttMapper::extractSubscriptions() {
158 std::list<iot::mqtt::Topic> topicList;
159
161
162 return topicList;
163 }
164
165 void MqttMapper::publishMappings(const iot::mqtt::packets::Publish& publish) {
166 if (!mappingJson.empty()) {
167 nlohmann::json matchingTopicLevel = findMatchingTopicLevel(mappingJson["topic_level"], publish.getTopic());
168
169 if (!matchingTopicLevel.empty()) {
170 const nlohmann::json& subscription = matchingTopicLevel["subscription"];
171
172 if (subscription.contains("static")) {
173 VLOG(1) << "Topic mapping found for:";
174 VLOG(1) << " Type: static";
175 VLOG(1) << " Topic: " << publish.getTopic();
176 VLOG(1) << " Message: " << publish.getMessage();
177 VLOG(1) << " QoS: " << static_cast<uint16_t>(publish.getQoS());
178 VLOG(1) << " Retain: " << publish.getRetain();
179
180 publishMappedMessages(subscription["static"], publish);
181 }
182
183 if (subscription.contains("value")) {
184 VLOG(1) << "Topic mapping found for:";
185 VLOG(1) << " Type: value";
186 VLOG(1) << " Topic: " << publish.getTopic();
187 VLOG(1) << " Message: " << publish.getMessage();
188 VLOG(1) << " QoS: " << static_cast<uint16_t>(publish.getQoS());
189 VLOG(1) << " Retain: " << publish.getRetain();
190
191 nlohmann::json json;
192 json["message"] = publish.getMessage();
193
194 publishMappedTemplates(subscription["value"], json, publish);
195 }
196
197 if (subscription.contains("json")) {
198 VLOG(1) << "Topic mapping found for:";
199 VLOG(1) << " Type: json";
200 VLOG(1) << " Topic: " << publish.getTopic();
201 VLOG(1) << " Message: " << publish.getMessage();
202 VLOG(1) << " QoS: " << static_cast<uint16_t>(publish.getQoS());
203 VLOG(1) << " Retain: " << publish.getRetain();
204
205 try {
206 nlohmann::json json;
207 json["message"] = nlohmann::json::parse(publish.getMessage());
208
209 publishMappedTemplates(subscription["json"], json, publish);
210 } catch (const nlohmann::json::parse_error& e) {
211 VLOG(1) << " Parsing message into json failed: " << publish.getMessage();
212 VLOG(1) << " What: " << e.what() << '\n'
213 << " Exception Id: " << e.id << '\n'
214 << " Byte position of error: " << e.byte;
215 }
216 }
217 }
218 }
219 }
220
221 void MqttMapper::extractSubscription(const nlohmann::json& topicLevelJson,
222 const std::string& topic,
223 std::list<iot::mqtt::Topic>& topicList) {
224 const std::string name = topicLevelJson["name"];
225
226 if (topicLevelJson.contains("subscription")) {
227 const uint8_t qoS = topicLevelJson["subscription"]["qos"];
228
229 topicList.emplace_back(topic + ((topic.empty() || topic == "/") && !name.empty() ? "" : "/") + name, qoS);
230 }
231
232 if (topicLevelJson.contains("topic_level")) {
233 extractSubscriptions(topicLevelJson, topic + ((topic.empty() || topic == "/") && !name.empty() ? "" : "/") + name, topicList);
234 }
235 }
236
237 void
238 MqttMapper::extractSubscriptions(const nlohmann::json& mappingJson, const std::string& topic, std::list<iot::mqtt::Topic>& topicList) {
239 const nlohmann::json& topicLevels = mappingJson["topic_level"];
240
241 if (topicLevels.is_object()) {
242 extractSubscription(topicLevels, topic, topicList);
243 } else {
244 for (const nlohmann::json& topicLevel : topicLevels) {
245 extractSubscription(topicLevel, topic, topicList);
246 }
247 }
248 }
249
250 nlohmann::json MqttMapper::findMatchingTopicLevel(const nlohmann::json& topicLevel, const std::string& topic) {
251 nlohmann::json foundTopicLevel;
252
253 if (topicLevel.is_object()) {
254 const std::string::size_type slashPosition = topic.find('/');
255 const std::string topicLevelName = topic.substr(0, slashPosition);
256
257 if (topicLevel["name"] == topicLevelName || topicLevel["name"] == "+" || topicLevel["name"] == "#") {
258 if (slashPosition == std::string::npos) {
259 foundTopicLevel = topicLevel;
260 } else if (topicLevel.contains("topic_level")) {
261 foundTopicLevel = findMatchingTopicLevel(topicLevel["topic_level"], topic.substr(slashPosition + 1));
262 }
263 }
264 } else if (topicLevel.is_array()) {
265 for (const nlohmann::json& topicLevelEntry : topicLevel) {
266 foundTopicLevel = findMatchingTopicLevel(topicLevelEntry, topic);
267
268 if (!foundTopicLevel.empty()) {
269 break;
270 }
271 }
272 }
273
274 return foundTopicLevel;
275 }
276
277 void MqttMapper::publishMappedTemplate(const nlohmann::json& templateMapping, nlohmann::json& json) {
278 const std::string& mappingTemplate = templateMapping["mapping_template"];
279 const std::string& mappedTopic = templateMapping["mapped_topic"];
280
281 try {
282 // Render topic
283 const std::string renderedTopic = injaEnvironment->render(mappedTopic, json);
284 json["mapped_topic"] = renderedTopic;
285
286 VLOG(1) << " Mapped topic template: " << mappedTopic;
287 VLOG(1) << " -> " << renderedTopic;
288
289 try {
290 // Render message
291 const std::string renderedMessage = injaEnvironment->render(mappingTemplate, json);
292 VLOG(1) << " Mapped message template: " << mappingTemplate;
293 VLOG(1) << " -> " << renderedMessage;
294
295 const nlohmann::json& suppressions = templateMapping["suppressions"];
296 const bool retain = templateMapping["retain"];
297
298 if (suppressions.empty() || std::find(suppressions.begin(), suppressions.end(), renderedMessage) == suppressions.end() ||
299 (retain && renderedMessage.empty())) {
300 const uint8_t qoS = templateMapping["qos"];
301
302 VLOG(1) << " Send mapping:";
303 VLOG(1) << " Topic: " << renderedTopic;
304 VLOG(1) << " Message: " << renderedMessage << "";
305 VLOG(1) << " QoS: " << static_cast<int>(qoS);
306 VLOG(1) << " retain: " << retain;
307
308 publishMapping(renderedTopic, renderedMessage, qoS, retain);
309 } else {
310 VLOG(1) << " Rendered message: '" << renderedMessage << "' in suppression list:";
311 for (const nlohmann::json& item : suppressions) {
312 VLOG(1) << " '" << item.get<std::string>() << "'";
313 }
314 VLOG(1) << " Send mapping: suppressed";
315 }
316 } catch (const inja::InjaError& e) {
317 VLOG(1) << " Message template rendering failed: " << mappingTemplate << " : " << json.dump();
318 VLOG(1) << " What: " << e.what();
319 VLOG(1) << " INJA: " << e.type << ": " << e.message;
320 VLOG(1) << " INJA (line:column):" << e.location.line << ":" << e.location.column;
321 }
322 } catch (const inja::InjaError& e) {
323 VLOG(1) << " Topic template rendering failed: " << mappingTemplate << " : " << json.dump();
324 VLOG(1) << " What: " << e.what();
325 VLOG(1) << " INJA: " << e.type << ": " << e.message;
326 VLOG(1) << " INJA (line:column):" << e.location.line << ":" << e.location.column;
327 }
328 }
329
330 void MqttMapper::publishMappedTemplates(const nlohmann::json& templateMapping,
331 nlohmann::json& json,
332 const iot::mqtt::packets::Publish& publish) {
333 json["topic"] = publish.getTopic();
334 json["qos"] = publish.getQoS();
335 json["retain"] = publish.getRetain();
336 json["package_identifier"] = publish.getPacketIdentifier();
337
338 try {
339 VLOG(1) << " Render data: " << json.dump();
340
341 if (templateMapping.is_object()) {
342 publishMappedTemplate(templateMapping, json);
343 } else {
344 for (const nlohmann::json& concreteTemplateMapping : templateMapping) {
345 publishMappedTemplate(concreteTemplateMapping, json);
346 }
347 }
348 } catch (const nlohmann::json::exception& e) {
349 VLOG(1) << "JSON Exception during Render data:\n" << e.what();
350 }
351 }
352
353 void MqttMapper::publishMappedMessage(const std::string& topic, const std::string& message, uint8_t qoS, bool retain) {
354 VLOG(1) << " Mapped topic:";
355 VLOG(1) << " -> " << topic;
356 VLOG(1) << " Mapped message:";
357 VLOG(1) << " -> " << message;
358 VLOG(1) << " Send mapping:";
359 VLOG(1) << " Topic: " << topic;
360 VLOG(1) << " Message: " << message;
361 VLOG(1) << " QoS: " << static_cast<int>(qoS);
362 VLOG(1) << " retain: " << retain;
363
364 publishMapping(topic, message, qoS, retain);
365 }
366
367 void MqttMapper::publishMappedMessage(const nlohmann::json& staticMapping, const iot::mqtt::packets::Publish& publish) {
368 const nlohmann::json& messageMapping = staticMapping["message_mapping"];
369
370 VLOG(1) << " Message mapping: " << messageMapping.dump();
371
372 if (messageMapping.is_object()) {
373 if (messageMapping["message"] == publish.getMessage()) {
375 staticMapping["mapped_topic"], messageMapping["mapped_message"], staticMapping["qos"], staticMapping["retain"]);
376 } else {
377 VLOG(1) << " no matching mapped message found";
378 }
379 } else {
380 const nlohmann::json::const_iterator matchedMessageMappingIterator =
381 std::find_if(messageMapping.begin(), messageMapping.end(), [&publish](const nlohmann::json& messageMappingCandidat) {
382 return messageMappingCandidat["message"] == publish.getMessage();
383 });
384
385 if (matchedMessageMappingIterator != messageMapping.end()) {
386 publishMappedMessage(staticMapping["mapped_topic"],
387 (*matchedMessageMappingIterator)["mapped_message"],
388 staticMapping["qos"],
389 staticMapping["retain"]);
390 } else {
391 VLOG(1) << " no matching mapped message found";
392 }
393 }
394 }
395
396 void MqttMapper::publishMappedMessages(const nlohmann::json& staticMapping, const iot::mqtt::packets::Publish& publish) {
397 if (staticMapping.is_object()) {
398 publishMappedMessage(staticMapping, publish);
399 } else if (staticMapping.is_array()) {
400 for (const nlohmann::json& concreteStaticMapping : staticMapping) {
401 publishMappedMessage(concreteStaticMapping, publish);
402 }
403 }
404 }
405
406} // namespace mqtt::lib
Class for changing the configuration.
Definition inja.hpp:2851
void publishMappedTemplate(const nlohmann::json &templateMapping, nlohmann::json &json)
static void extractSubscription(const nlohmann::json &topicLevelJson, const std::string &topic, std::list< iot::mqtt::Topic > &topicList)
void publishMappedMessage(const nlohmann::json &staticMapping, const iot::mqtt::packets::Publish &publish)
void publishMappings(const iot::mqtt::packets::Publish &publish)
void publishMappedMessage(const std::string &topic, const std::string &message, uint8_t qoS, bool retain)
nlohmann::json findMatchingTopicLevel(const nlohmann::json &topicLevel, const std::string &topic)
void publishMappedMessages(const nlohmann::json &staticMapping, const iot::mqtt::packets::Publish &publish)
virtual void publishMapping(const std::string &topic, const std::string &message, uint8_t qoS, bool retain)=0
const nlohmann::json & mappingJson
Definition MqttMapper.h:99
std::list< void * > pluginHandles
Definition MqttMapper.h:101
void publishMappedTemplates(const nlohmann::json &templateMapping, nlohmann::json &json, const iot::mqtt::packets::Publish &publish)
inja::Environment * injaEnvironment
Definition MqttMapper.h:103
MqttMapper(const nlohmann::json &mappingJson)
std::list< iot::mqtt::Topic > extractSubscriptions()
static void extractSubscriptions(const nlohmann::json &mappingJson, const std::string &topic, std::list< iot::mqtt::Topic > &topicList)
const std::string type
Definition inja.hpp:256
const SourceLocation location
Definition inja.hpp:259
const std::string message
Definition inja.hpp:257