MQTTSuite
Loading...
Searching...
No Matches
MqttMapper.cpp
Go to the documentation of this file.
1/*
2 * MQTTSuite - A lightweight MQTT Integration System
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2022, 2023, 2024, 2025, 2026
5 *
6 * This program is free software: you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the Free
8 * Software Foundation, either version 3 of the License, or (at your option)
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * MIT License
22 *
23 * Permission is hereby granted, free of charge, to any person obtaining a copy
24 * of this software and associated documentation files (the "Software"), to deal
25 * in the Software without restriction, including without limitation the rights
26 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
27 * copies of the Software, and to permit persons to whom the Software is
28 * furnished to do so, subject to the following conditions:
29 *
30 * The above copyright notice and this permission notice shall be included in
31 * all copies or substantial portions of the Software.
32 *
33 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 * THE SOFTWARE.
40 */
41
42#include "MqttMapper.h"
43
45
46#include <core/DynamicLoader.h>
47#include <iot/mqtt/Topic.h>
48
49#ifndef DOXYGEN_SHOULD_SKIP_THIS
50
51#include "nlohmann/json-schema.hpp"
52
53#include <cmath>
54#include <exception>
55
56#ifdef __GNUC__
57#pragma GCC diagnostic push
58#ifdef __has_warning
59#if __has_warning("-Wcovered-switch-default")
60#pragma GCC diagnostic ignored "-Wcovered-switch-default"
61#endif
62#if __has_warning("-Wnrvo")
63#pragma GCC diagnostic ignored "-Wnrvo"
64#endif
65#if __has_warning("-Wsuggest-override")
66#pragma GCC diagnostic ignored "-Wsuggest-override"
67#endif
68#if __has_warning("-Wmissing-noreturn")
69#pragma GCC diagnostic ignored "-Wmissing-noreturn"
70#endif
71#if __has_warning("-Wdeprecated-copy-with-user-provided-dtor")
72#pragma GCC diagnostic ignored "-Wdeprecated-copy-with-user-provided-dtor"
73#endif
74#endif
75#endif
76#include "inja.hpp"
77#ifdef __GNUC_
78#pragma GCC diagnostic pop
79#endif
80
81#include <algorithm>
82#include <log/Logger.h>
83#include <map>
84#include <nlohmann/json.hpp>
85#include <stdexcept>
86#include <vector>
87
88#endif
89
90// IWYU pragma: no_include <nlohmann/detail/iterators/iter_impl.hpp>
91
92namespace mqtt::lib {
93
94#include "mapping-schema.json.h" // definition of mappingJsonSchemaString
95
98
102
104 delete injaEnvironment;
105
106 for (void* pluginHandle : pluginHandles) {
107 core::DynamicLoader::dlClose(pluginHandle);
108 }
109 }
110
111 const std::string& MqttMapper::getSchema() {
113 }
114
115 bool MqttMapper::setMapping(nlohmann::json mappingJson) { // can throw
116 delete injaEnvironment;
117
118 for (void* handle : pluginHandles) {
119 core::DynamicLoader::dlClose(handle);
120 }
121 pluginHandles.clear();
122
124
125 nlohmann::json defaultPatch;
126
127 try {
128 defaultPatch = validator.validate(mappingJson);
129 } catch (const std::exception& e) {
130 throw std::runtime_error("Validating JSON failed: Mapping JSON = " + mappingJson.dump(4) + "\n" + e.what());
131 }
132
133 try {
134 mappingJson = mappingJson.patch(defaultPatch);
135 } catch (const std::exception& e) {
136 throw std::runtime_error("Patching JSON with default patch failed: Default patch = " + defaultPatch.dump(4) + "\n" + e.what());
137 }
138
139 bool mustReconnect = this->mappingJson["connection"] != mappingJson["connection"];
140
141 this->mappingJson = mappingJson;
142
143 if (mappingJson["mapping"].contains("plugins")) {
144 for (const nlohmann::json& pluginJson : mappingJson["mapping"]["plugins"]) {
145 const std::string plugin = pluginJson;
146
147 void* handle = core::DynamicLoader::dlOpen(plugin);
148
149 if (handle != nullptr) {
150 pluginHandles.push_back(handle);
151
152 VLOG(1) << " Loading plugin: " << plugin << " ...";
153
154 const std::vector<mqtt::lib::Function>* loadedFunctions =
155 static_cast<std::vector<mqtt::lib::Function>*>(core::DynamicLoader::dlSym(handle, "functions"));
156 if (loadedFunctions != nullptr) {
157 VLOG(1) << " Registering inja 'none void callbacks'";
158 for (const mqtt::lib::Function& function : *loadedFunctions) {
159 VLOG(1) << " " << function.name;
160
161 if (function.numArgs >= 0) {
162 injaEnvironment->add_callback(function.name, function.numArgs, function.function);
163 } else {
164 injaEnvironment->add_callback(function.name, function.function);
165 }
166 }
167 VLOG(1) << " Registering inja 'none void callbacks done'";
168 } else {
169 VLOG(1) << " No inja none 'void callbacks found' in plugin " << plugin;
170 }
171
172 const std::vector<mqtt::lib::VoidFunction>* loadedVoidFunctions =
173 static_cast<std::vector<mqtt::lib::VoidFunction>*>(core::DynamicLoader::dlSym(handle, "voidFunctions"));
174 if (loadedVoidFunctions != nullptr) {
175 VLOG(1) << " Registering inja 'void callbacks'";
176 for (const mqtt::lib::VoidFunction& voidFunction : *loadedVoidFunctions) {
177 VLOG(1) << " " << voidFunction.name;
178
179 if (voidFunction.numArgs >= 0) {
180 injaEnvironment->add_void_callback(voidFunction.name, voidFunction.numArgs, voidFunction.function);
181 } else {
182 injaEnvironment->add_void_callback(voidFunction.name, voidFunction.function);
183 }
184 }
185 VLOG(1) << " Registering inja 'void callbacks' done";
186 } else {
187 VLOG(1) << " No inja 'void callbacks' found in plugin " << plugin;
188 }
189
190 VLOG(1) << " Loading plugin done: " << plugin;
191 } else {
192 VLOG(1) << " Error loading plugin: " << plugin;
193 throw std::runtime_error("Error loading plugin '" + plugin + "': " + core::DynamicLoader::dlError());
194 }
195 }
196
197 VLOG(1) << "Loading plugins done";
198 }
199
200 return mustReconnect;
201 }
202
203 const nlohmann::json& MqttMapper::getMapping() const {
204 return mappingJson;
205 }
206
207 std::string MqttMapper::dump() {
208 return mappingJson.dump();
209 }
210
211 const nlohmann::json& MqttMapper::getConnection() const {
212 return mappingJson["connection"];
213 }
214
216 std::list<iot::mqtt::Topic> topicList;
217
218 extractSubscriptions(mappingJson["mapping"], "", topicList);
219
220 return topicList;
221 }
222
223 const nlohmann::json MqttMapper::validate(const nlohmann::json& json) {
224 return validator.validate(json);
225 }
226
228 return validator.validate(json, err);
229 }
230
231 MqttMapper::MappedPublishes MqttMapper::getMappings(const iot::mqtt::packets::Publish& publish) {
232 MappedPublishes mappedPublishes;
233 if (mappingJson.contains("mapping") && !mappingJson["mapping"].empty()) {
234 nlohmann::json matchingTopicLevel = findMatchingTopicLevel(mappingJson["mapping"]["topic_level"], publish.getTopic());
235
236 if (!matchingTopicLevel.empty()) {
237 const nlohmann::json& subscription = matchingTopicLevel["subscription"];
238
239 if (subscription.contains("static")) {
240 VLOG(1) << "Topic mapping found for:";
241 VLOG(1) << " Type: static";
242 VLOG(1) << " Topic: " << publish.getTopic();
243 VLOG(1) << " Message: " << publish.getMessage();
244 VLOG(1) << " QoS: " << static_cast<uint16_t>(publish.getQoS());
245 VLOG(1) << " Retain: " << publish.getRetain();
246
247 getStaticMappings(subscription["static"], publish, mappedPublishes);
248 }
249
250 if (subscription.contains("value")) {
251 VLOG(1) << "Topic mapping found for:";
252 VLOG(1) << " Type: value";
253 VLOG(1) << " Topic: " << publish.getTopic();
254 VLOG(1) << " Message: " << publish.getMessage();
255 VLOG(1) << " QoS: " << static_cast<uint16_t>(publish.getQoS());
256 VLOG(1) << " Retain: " << publish.getRetain();
257
258 nlohmann::json json;
259 json["message"] = publish.getMessage();
260
261 getTemplateMappings(subscription["value"], json, publish, mappedPublishes);
262 }
263
264 if (subscription.contains("json")) {
265 VLOG(1) << "Topic mapping found for:";
266 VLOG(1) << " Type: json";
267 VLOG(1) << " Topic: " << publish.getTopic();
268 VLOG(1) << " Message: " << publish.getMessage();
269 VLOG(1) << " QoS: " << static_cast<uint16_t>(publish.getQoS());
270 VLOG(1) << " Retain: " << publish.getRetain();
271
272 try {
273 nlohmann::json json;
274 json["message"] = nlohmann::json::parse(publish.getMessage());
275
276 getTemplateMappings(subscription["json"], json, publish, mappedPublishes);
277 } catch (const nlohmann::json::parse_error& e) {
278 VLOG(1) << " Parsing message into json failed: " << publish.getMessage();
279 VLOG(1) << " What: " << e.what() << '\n'
280 << " Exception Id: " << e.id << '\n'
281 << " Byte position of error: " << e.byte;
282 }
283 }
284 }
285 }
286
287 return mappedPublishes;
288 }
289
290 void MqttMapper::extractSubscription(const nlohmann::json& topicLevelJson,
291 const std::string& topic,
292 std::list<iot::mqtt::Topic>& topicList) {
293 const std::string name = topicLevelJson["name"];
294
295 if (topicLevelJson.contains("subscription")) {
296 const uint8_t qoS = topicLevelJson["subscription"]["qos"];
297
298 topicList.emplace_back(topic + ((topic.empty() || topic == "/") && !name.empty() ? "" : "/") + name, qoS);
299 }
300
301 if (topicLevelJson.contains("topic_level")) {
302 extractSubscriptions(topicLevelJson, topic + ((topic.empty() || topic == "/") && !name.empty() ? "" : "/") + name, topicList);
303 }
304 }
305
306 void
307 MqttMapper::extractSubscriptions(const nlohmann::json& mappingJson, const std::string& topic, std::list<iot::mqtt::Topic>& topicList) {
308 if (mappingJson.contains("topic_level")) {
309 const nlohmann::json& topicLevels = mappingJson["topic_level"];
310
311 if (topicLevels.is_object()) {
312 extractSubscription(topicLevels, topic, topicList);
313 } else {
314 for (const nlohmann::json& topicLevel : topicLevels) {
315 extractSubscription(topicLevel, topic, topicList);
316 }
317 }
318 }
319 }
320
321 nlohmann::json MqttMapper::findMatchingTopicLevel(const nlohmann::json& topicLevel, const std::string& topic) {
322 nlohmann::json foundTopicLevel;
323
324 if (topicLevel.is_object()) {
325 const std::string::size_type slashPosition = topic.find('/');
326 const std::string topicLevelName = topic.substr(0, slashPosition);
327
328 if (topicLevel["name"] == topicLevelName || topicLevel["name"] == "+" || topicLevel["name"] == "#") {
329 if (slashPosition == std::string::npos) {
330 foundTopicLevel = topicLevel;
331 } else if (topicLevel.contains("topic_level")) {
332 foundTopicLevel = findMatchingTopicLevel(topicLevel["topic_level"], topic.substr(slashPosition + 1));
333 }
334 }
335 } else if (topicLevel.is_array()) {
336 for (const nlohmann::json& topicLevelEntry : topicLevel) {
337 foundTopicLevel = findMatchingTopicLevel(topicLevelEntry, topic);
338
339 if (!foundTopicLevel.empty()) {
340 break;
341 }
342 }
343 }
344
345 return foundTopicLevel;
346 }
347
348 void MqttMapper::publishMappedTemplate(const nlohmann::json& templateMapping, nlohmann::json& json, MappedPublishes& mappedPublishes) {
349 const std::string& mappingTemplate = templateMapping["mapping_template"];
350 const std::string& mappedTopic = templateMapping["mapped_topic"];
351
352 try {
353 // Render topic
354 const std::string renderedTopic = injaEnvironment->render(mappedTopic, json);
355 json["mapped_topic"] = renderedTopic;
356
357 VLOG(1) << " Mapped topic template: " << mappedTopic;
358 VLOG(1) << " -> " << renderedTopic;
359
360 try {
361 // Render message
362 const std::string renderedMessage = injaEnvironment->render(mappingTemplate, json);
363 VLOG(1) << " Mapped message template: " << mappingTemplate;
364 VLOG(1) << " -> " << renderedMessage;
365
366 const nlohmann::json& suppressions = templateMapping["suppressions"];
367 const bool retain = templateMapping["retain"];
368
369 if (suppressions.empty() || std::find(suppressions.begin(), suppressions.end(), renderedMessage) == suppressions.end() ||
370 (retain && renderedMessage.empty())) {
371 const uint8_t qoS = templateMapping["qos"];
372 const double delay = templateMapping["delay"];
373
374 VLOG(1) << " Send mapping:" << (delay > 0 ? " delayed" : "");
375 VLOG(1) << " Topic: " << renderedTopic;
376 VLOG(1) << " Message: " << renderedMessage << "";
377 VLOG(1) << " QoS: " << static_cast<int>(qoS);
378 VLOG(1) << " retain: " << retain;
379 VLOG(1) << " Delay: " << delay;
380
381 publishMappedMessage(renderedTopic, renderedMessage, qoS, retain, delay, mappedPublishes);
382 } else {
383 VLOG(1) << " Rendered message: '" << renderedMessage << "' in suppression list:";
384 for (const nlohmann::json& item : suppressions) {
385 VLOG(1) << " '" << item.get<std::string>() << "'";
386 }
387 VLOG(1) << " Send mapping: suppressed";
388 }
389 } catch (const inja::InjaError& e) {
390 VLOG(1) << " Message template rendering failed: " << mappingTemplate << " : " << json.dump();
391 VLOG(1) << " What: " << e.what();
392 VLOG(1) << " INJA: " << e.type << ": " << e.message;
393 VLOG(1) << " INJA (line:column):" << e.location.line << ":" << e.location.column;
394 }
395 } catch (const inja::InjaError& e) {
396 VLOG(1) << " Topic template rendering failed: " << mappingTemplate << " : " << json.dump();
397 VLOG(1) << " What: " << e.what();
398 VLOG(1) << " INJA: " << e.type << ": " << e.message;
399 VLOG(1) << " INJA (line:column):" << e.location.line << ":" << e.location.column;
400 }
401 }
402
403 void MqttMapper::getTemplateMappings(const nlohmann::json& templateMapping,
404 nlohmann::json& json,
405 const iot::mqtt::packets::Publish& publish,
406 MappedPublishes& mappedPublishes) {
407 json["topic"] = publish.getTopic();
408 json["qos"] = publish.getQoS();
409 json["retain"] = publish.getRetain();
410 json["package_identifier"] = publish.getPacketIdentifier();
411
412 try {
413 VLOG(1) << " Render data: " << json.dump();
414
415 if (templateMapping.is_object()) {
416 publishMappedTemplate(templateMapping, json, mappedPublishes);
417 } else {
418 for (const nlohmann::json& concreteTemplateMapping : templateMapping) {
419 publishMappedTemplate(concreteTemplateMapping, json, mappedPublishes);
420 }
421 }
422 } catch (const nlohmann::json::exception& e) {
423 VLOG(1) << "JSON Exception during Render data:\n" << e.what();
424 }
425 }
426
428 const std::string& topic, const std::string& message, uint8_t qoS, bool retain, double delay, MappedPublishes& mappedPublishes) {
429 VLOG(1) << " Mapped topic:";
430 VLOG(1) << " -> " << topic;
431 VLOG(1) << " Mapped message:";
432 VLOG(1) << " -> " << message;
433 VLOG(1) << " Send mapping:" << (delay > 0 ? " delayed" : "");
434 VLOG(1) << " Topic: " << topic;
435 VLOG(1) << " Message: " << message;
436 VLOG(1) << " QoS: " << static_cast<int>(qoS);
437 VLOG(1) << " retain: " << retain;
438 VLOG(1) << " Delay: " << delay;
439
440 if (delay < 0.0) {
441 mappedPublishes.first.emplace_back(0, topic, message, qoS, false, retain);
442 } else {
443 mappedPublishes.second.push_back({delay, iot::mqtt::packets::Publish(0, topic, message, qoS, false, retain)});
444 }
445 }
446
447 void MqttMapper::publishMappedMessage(const nlohmann::json& staticMapping,
448 const iot::mqtt::packets::Publish& publish,
449 MappedPublishes& mappedPublishes) {
450 const nlohmann::json& messageMapping = staticMapping["message_mapping"];
451
452 VLOG(1) << " Message mapping: " << messageMapping.dump();
453
454 if (messageMapping.is_object()) {
455 if (messageMapping["message"] == publish.getMessage()) {
456 publishMappedMessage(staticMapping["mapped_topic"],
457 messageMapping["mapped_message"],
458 staticMapping["qos"],
459 staticMapping["retain"],
460 staticMapping["delay"],
461 mappedPublishes);
462 } else {
463 VLOG(1) << " no matching mapped message found";
464 }
465 } else {
466 const nlohmann::json::const_iterator matchedMessageMappingIterator =
467 std::find_if(messageMapping.begin(), messageMapping.end(), [&publish](const nlohmann::json& messageMappingCandidat) {
468 return messageMappingCandidat["message"] == publish.getMessage();
469 });
470
471 if (matchedMessageMappingIterator != messageMapping.end()) {
472 publishMappedMessage(staticMapping["mapped_topic"],
473 (*matchedMessageMappingIterator)["mapped_message"],
474 staticMapping["qos"],
475 staticMapping["retain"],
476 staticMapping["delay"],
477 mappedPublishes);
478 } else {
479 VLOG(1) << " no matching mapped message found";
480 }
481 }
482 }
483
484 void MqttMapper::getStaticMappings(const nlohmann::json& staticMapping,
485 const iot::mqtt::packets::Publish& publish,
486 MappedPublishes& mappedPublishes) {
487 if (staticMapping.is_object()) {
488 publishMappedMessage(staticMapping, publish, mappedPublishes);
489 } else if (staticMapping.is_array()) {
490 for (const nlohmann::json& concreteStaticMapping : staticMapping) {
491 publishMappedMessage(concreteStaticMapping, publish, mappedPublishes);
492 }
493 }
494 }
495
496} // namespace mqtt::lib
Class for changing the configuration.
Definition inja.hpp:2993
std::string render(std::string_view input, const json &data)
Definition inja.hpp:3093
static void extractSubscription(const nlohmann::json &topicLevelJson, const std::string &topic, std::list< iot::mqtt::Topic > &topicList)
std::list< iot::mqtt::Topic > extractSubscriptions() const
MappedPublishes getMappings(const iot::mqtt::packets::Publish &publish)
nlohmann::json findMatchingTopicLevel(const nlohmann::json &topicLevel, const std::string &topic)
nlohmann::json mappingJson
Definition MqttMapper.h:129
bool setMapping(nlohmann::json mappingJson)
static const nlohmann::json validate(const nlohmann::json &json)
std::pair< std::vector< iot::mqtt::packets::Publish >, std::vector< ScheduledPublish > > MappedPublishes
Definition MqttMapper.h:83
void getTemplateMappings(const nlohmann::json &templateMapping, nlohmann::json &json, const iot::mqtt::packets::Publish &publish, MappedPublishes &mappedPublishes)
static const nlohmann::json_schema::json_validator validator
Definition MqttMapper.h:135
std::list< void * > pluginHandles
Definition MqttMapper.h:131
static const nlohmann::json validate(const nlohmann::json &json, nlohmann::json_schema::basic_error_handler &err)
static void publishMappedMessage(const nlohmann::json &staticMapping, const iot::mqtt::packets::Publish &publish, MappedPublishes &mappedPublishes)
static const std::string mappingJsonSchemaString
Definition MqttMapper.h:137
const nlohmann::json & getMapping() const
inja::Environment * injaEnvironment
Definition MqttMapper.h:133
static void getStaticMappings(const nlohmann::json &staticMapping, const iot::mqtt::packets::Publish &publish, MappedPublishes &mappedPublishes)
static const std::string & getSchema()
const nlohmann::json & getConnection() const
static void publishMappedMessage(const std::string &topic, const std::string &message, uint8_t qoS, bool retain, double delay, MappedPublishes &mappedPublishes)
void publishMappedTemplate(const nlohmann::json &templateMapping, nlohmann::json &json, MappedPublishes &mappedPublishes)
json validate(const json &, error_handler &, const json_uri &initial_uri=json_uri("#")) const
void default_string_format_check(const std::string &format, const std::string &value)
const std::string type
Definition inja.hpp:283
const SourceLocation location
Definition inja.hpp:286
const std::string message
Definition inja.hpp:284