2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
46#include <core/DynamicLoader.h>
47#include <iot/mqtt/Topic.h>
49#ifndef DOXYGEN_SHOULD_SKIP_THIS
51#include "nlohmann/json-schema.hpp"
57#pragma GCC diagnostic push
59#if __has_warning
("-Wcovered-switch-default")
60#pragma GCC diagnostic ignored "-Wcovered-switch-default"
62#if __has_warning
("-Wnrvo")
63#pragma GCC diagnostic ignored "-Wnrvo"
65#if __has_warning
("-Wsuggest-override")
66#pragma GCC diagnostic ignored "-Wsuggest-override"
68#if __has_warning
("-Wmissing-noreturn")
69#pragma GCC diagnostic ignored "-Wmissing-noreturn"
71#if __has_warning
("-Wdeprecated-copy-with-user-provided-dtor")
72#pragma GCC diagnostic ignored "-Wdeprecated-copy-with-user-provided-dtor"
78#pragma GCC diagnostic pop
82#include <log/Logger.h>
84#include <nlohmann/json.hpp>
94#include "mapping-schema.json.h"
106 for (
void* pluginHandle : pluginHandles) {
107 core::DynamicLoader::dlClose(pluginHandle);
118 for (
void* handle : pluginHandles) {
119 core::DynamicLoader::dlClose(handle);
129 }
catch (
const std::exception& e) {
130 throw std::runtime_error(
"Validating JSON failed: Mapping JSON = " + mappingJson.dump(4) +
"\n" + e.what());
134 mappingJson = mappingJson.patch(defaultPatch);
135 }
catch (
const std::exception& e) {
136 throw std::runtime_error(
"Patching JSON with default patch failed: Default patch = " + defaultPatch.dump(4) +
"\n" + e.what());
139 bool mustReconnect =
this->mappingJson[
"connection"] != mappingJson[
"connection"];
143 if (mappingJson[
"mapping"].contains(
"plugins")) {
144 for (
const nlohmann::json& pluginJson : mappingJson[
"mapping"][
"plugins"]) {
145 const std::string plugin = pluginJson;
147 void* handle = core::DynamicLoader::dlOpen(plugin);
149 if (handle !=
nullptr) {
150 pluginHandles.push_back(handle);
152 VLOG(1) <<
" Loading plugin: " << plugin <<
" ...";
154 const std::vector<mqtt::lib::Function>* loadedFunctions =
155 static_cast<std::vector<mqtt::lib::Function>*>(core::DynamicLoader::dlSym(handle,
"functions"));
156 if (loadedFunctions !=
nullptr) {
157 VLOG(1) <<
" Registering inja 'none void callbacks'";
158 for (
const mqtt::lib::Function& function : *loadedFunctions) {
159 VLOG(1) <<
" " << function.name;
161 if (function.numArgs >= 0) {
162 injaEnvironment->add_callback(function.name, function.numArgs, function.function);
164 injaEnvironment->add_callback(function.name, function.function);
167 VLOG(1) <<
" Registering inja 'none void callbacks done'";
169 VLOG(1) <<
" No inja none 'void callbacks found' in plugin " << plugin;
172 const std::vector<mqtt::lib::VoidFunction>* loadedVoidFunctions =
173 static_cast<std::vector<mqtt::lib::VoidFunction>*>(core::DynamicLoader::dlSym(handle,
"voidFunctions"));
174 if (loadedVoidFunctions !=
nullptr) {
175 VLOG(1) <<
" Registering inja 'void callbacks'";
176 for (
const mqtt::lib::VoidFunction& voidFunction : *loadedVoidFunctions) {
177 VLOG(1) <<
" " << voidFunction.name;
179 if (voidFunction.numArgs >= 0) {
180 injaEnvironment->add_void_callback(voidFunction.name, voidFunction.numArgs, voidFunction.function);
182 injaEnvironment->add_void_callback(voidFunction.name, voidFunction.function);
185 VLOG(1) <<
" Registering inja 'void callbacks' done";
187 VLOG(1) <<
" No inja 'void callbacks' found in plugin " << plugin;
190 VLOG(1) <<
" Loading plugin done: " << plugin;
192 VLOG(1) <<
" Error loading plugin: " << plugin;
193 throw std::runtime_error(
"Error loading plugin '" + plugin +
"': " + core::DynamicLoader::dlError());
197 VLOG(1) <<
"Loading plugins done";
200 return mustReconnect;
212 return mappingJson[
"connection"];
216 std::list<iot::mqtt::Topic> topicList;
218 extractSubscriptions(mappingJson[
"mapping"],
"", topicList);
233 if (mappingJson.contains(
"mapping") && !mappingJson[
"mapping"].empty()) {
236 if (!matchingTopicLevel.empty()) {
237 const nlohmann::json& subscription = matchingTopicLevel[
"subscription"];
239 if (subscription.contains(
"static")) {
240 VLOG(1) <<
"Topic mapping found for:";
241 VLOG(1) <<
" Type: static";
242 VLOG(1) <<
" Topic: " << publish.getTopic();
243 VLOG(1) <<
" Message: " << publish.getMessage();
244 VLOG(1) <<
" QoS: " <<
static_cast<uint16_t>(publish.getQoS());
245 VLOG(1) <<
" Retain: " << publish.getRetain();
250 if (subscription.contains(
"value")) {
251 VLOG(1) <<
"Topic mapping found for:";
252 VLOG(1) <<
" Type: value";
253 VLOG(1) <<
" Topic: " << publish.getTopic();
254 VLOG(1) <<
" Message: " << publish.getMessage();
255 VLOG(1) <<
" QoS: " <<
static_cast<uint16_t>(publish.getQoS());
256 VLOG(1) <<
" Retain: " << publish.getRetain();
259 json[
"message"] = publish.getMessage();
264 if (subscription.contains(
"json")) {
265 VLOG(1) <<
"Topic mapping found for:";
266 VLOG(1) <<
" Type: json";
267 VLOG(1) <<
" Topic: " << publish.getTopic();
268 VLOG(1) <<
" Message: " << publish.getMessage();
269 VLOG(1) <<
" QoS: " <<
static_cast<uint16_t>(publish.getQoS());
270 VLOG(1) <<
" Retain: " << publish.getRetain();
274 json[
"message"] = nlohmann::json::parse(publish.getMessage());
277 }
catch (
const nlohmann::json::parse_error& e) {
278 VLOG(1) <<
" Parsing message into json failed: " << publish.getMessage();
279 VLOG(1) <<
" What: " << e.what() <<
'\n'
280 <<
" Exception Id: " << e.id <<
'\n'
281 <<
" Byte position of error: " << e.byte;
287 return mappedPublishes;
291 const std::string& topic,
292 std::list<iot::mqtt::Topic>& topicList) {
293 const std::string name = topicLevelJson[
"name"];
295 if (topicLevelJson.contains(
"subscription")) {
296 const uint8_t qoS = topicLevelJson[
"subscription"][
"qos"];
298 topicList.emplace_back(topic + ((topic.empty() || topic ==
"/") && !name.empty() ?
"" :
"/") + name, qoS);
301 if (topicLevelJson.contains(
"topic_level")) {
302 extractSubscriptions(topicLevelJson, topic + ((topic.empty() || topic ==
"/") && !name.empty() ?
"" :
"/") + name, topicList);
307 MqttMapper::extractSubscriptions(
const nlohmann::json& mappingJson,
const std::string& topic, std::list<iot::mqtt::Topic>& topicList) {
308 if (mappingJson.contains(
"topic_level")) {
309 const nlohmann::json& topicLevels = mappingJson[
"topic_level"];
311 if (topicLevels.is_object()) {
314 for (
const nlohmann::json& topicLevel : topicLevels) {
315 extractSubscription(topicLevel, topic, topicList);
324 if (topicLevel.is_object()) {
325 const std::string::size_type slashPosition = topic.find(
'/');
326 const std::string topicLevelName = topic.substr(0, slashPosition);
328 if (topicLevel[
"name"] == topicLevelName || topicLevel[
"name"] ==
"+" || topicLevel[
"name"] ==
"#") {
329 if (slashPosition == std::string::npos) {
330 foundTopicLevel = topicLevel;
331 }
else if (topicLevel.contains(
"topic_level")) {
335 }
else if (topicLevel.is_array()) {
336 for (
const nlohmann::json& topicLevelEntry : topicLevel) {
337 foundTopicLevel = findMatchingTopicLevel(topicLevelEntry, topic);
339 if (!foundTopicLevel.empty()) {
345 return foundTopicLevel;
349 const std::string& mappingTemplate = templateMapping[
"mapping_template"];
350 const std::string& mappedTopic = templateMapping[
"mapped_topic"];
355 json[
"mapped_topic"] = renderedTopic;
357 VLOG(1) <<
" Mapped topic template: " << mappedTopic;
358 VLOG(1) <<
" -> " << renderedTopic;
363 VLOG(1) <<
" Mapped message template: " << mappingTemplate;
364 VLOG(1) <<
" -> " << renderedMessage;
366 const nlohmann::json& suppressions = templateMapping[
"suppressions"];
367 const bool retain = templateMapping[
"retain"];
369 if (suppressions.empty() || std::find(suppressions.begin(), suppressions.end(), renderedMessage) == suppressions.end() ||
370 (retain && renderedMessage.empty())) {
371 const uint8_t qoS = templateMapping[
"qos"];
372 const double delay = templateMapping[
"delay"];
374 VLOG(1) <<
" Send mapping:" << (delay > 0 ?
" delayed" :
"");
375 VLOG(1) <<
" Topic: " << renderedTopic;
376 VLOG(1) <<
" Message: " << renderedMessage <<
"";
377 VLOG(1) <<
" QoS: " <<
static_cast<
int>(qoS);
378 VLOG(1) <<
" retain: " << retain;
379 VLOG(1) <<
" Delay: " << delay;
383 VLOG(1) <<
" Rendered message: '" << renderedMessage <<
"' in suppression list:";
384 for (
const nlohmann::json& item : suppressions) {
385 VLOG(1) <<
" '" << item.get<std::string>() <<
"'";
387 VLOG(1) <<
" Send mapping: suppressed";
390 VLOG(1) <<
" Message template rendering failed: " << mappingTemplate <<
" : " << json.dump();
391 VLOG(1) <<
" What: " << e.what();
396 VLOG(1) <<
" Topic template rendering failed: " << mappingTemplate <<
" : " << json.dump();
397 VLOG(1) <<
" What: " << e.what();
407 json[
"topic"] = publish.getTopic();
408 json[
"qos"] = publish.getQoS();
409 json[
"retain"] = publish.getRetain();
410 json[
"package_identifier"] = publish.getPacketIdentifier();
413 VLOG(1) <<
" Render data: " << json.dump();
415 if (templateMapping.is_object()) {
418 for (
const nlohmann::json& concreteTemplateMapping : templateMapping) {
419 publishMappedTemplate(concreteTemplateMapping, json, mappedPublishes);
422 }
catch (
const nlohmann::json::exception& e) {
423 VLOG(1) <<
"JSON Exception during Render data:\n" << e.what();
428 const std::string& topic,
const std::string& message, uint8_t qoS,
bool retain,
double delay,
MappedPublishes& mappedPublishes) {
429 VLOG(1) <<
" Mapped topic:";
430 VLOG(1) <<
" -> " << topic;
431 VLOG(1) <<
" Mapped message:";
432 VLOG(1) <<
" -> " << message;
433 VLOG(1) <<
" Send mapping:" << (delay > 0 ?
" delayed" :
"");
434 VLOG(1) <<
" Topic: " << topic;
435 VLOG(1) <<
" Message: " << message;
436 VLOG(1) <<
" QoS: " <<
static_cast<
int>(qoS);
437 VLOG(1) <<
" retain: " << retain;
438 VLOG(1) <<
" Delay: " << delay;
441 mappedPublishes.first.emplace_back(0, topic, message, qoS,
false, retain);
443 mappedPublishes.second.push_back({delay, iot::
mqtt::
packets::Publish(0, topic, message, qoS,
false, retain)});
450 const nlohmann::json& messageMapping = staticMapping[
"message_mapping"];
452 VLOG(1) <<
" Message mapping: " << messageMapping.dump();
454 if (messageMapping.is_object()) {
455 if (messageMapping[
"message"] == publish.getMessage()) {
456 publishMappedMessage(staticMapping[
"mapped_topic"],
457 messageMapping[
"mapped_message"],
458 staticMapping[
"qos"],
459 staticMapping[
"retain"],
460 staticMapping[
"delay"],
463 VLOG(1) <<
" no matching mapped message found";
466 const nlohmann::json::const_iterator matchedMessageMappingIterator =
467 std::find_if(messageMapping.begin(), messageMapping.end(), [&publish](
const nlohmann::json& messageMappingCandidat) {
468 return messageMappingCandidat[
"message"] == publish.getMessage();
471 if (matchedMessageMappingIterator != messageMapping.end()) {
472 publishMappedMessage(staticMapping[
"mapped_topic"],
473 (*matchedMessageMappingIterator)[
"mapped_message"],
474 staticMapping[
"qos"],
475 staticMapping[
"retain"],
476 staticMapping[
"delay"],
479 VLOG(1) <<
" no matching mapped message found";
487 if (staticMapping.is_object()) {
489 }
else if (staticMapping.is_array()) {
490 for (
const nlohmann::json& concreteStaticMapping : staticMapping) {
491 publishMappedMessage(concreteStaticMapping, publish, mappedPublishes);
Class for changing the configuration.
std::string render(std::string_view input, const json &data)
static void extractSubscription(const nlohmann::json &topicLevelJson, const std::string &topic, std::list< iot::mqtt::Topic > &topicList)
std::list< iot::mqtt::Topic > extractSubscriptions() const
MappedPublishes getMappings(const iot::mqtt::packets::Publish &publish)
nlohmann::json findMatchingTopicLevel(const nlohmann::json &topicLevel, const std::string &topic)
nlohmann::json mappingJson
bool setMapping(nlohmann::json mappingJson)
static const nlohmann::json validate(const nlohmann::json &json)
std::pair< std::vector< iot::mqtt::packets::Publish >, std::vector< ScheduledPublish > > MappedPublishes
void getTemplateMappings(const nlohmann::json &templateMapping, nlohmann::json &json, const iot::mqtt::packets::Publish &publish, MappedPublishes &mappedPublishes)
static const nlohmann::json_schema::json_validator validator
std::list< void * > pluginHandles
static const nlohmann::json validate(const nlohmann::json &json, nlohmann::json_schema::basic_error_handler &err)
static void publishMappedMessage(const nlohmann::json &staticMapping, const iot::mqtt::packets::Publish &publish, MappedPublishes &mappedPublishes)
static const std::string mappingJsonSchemaString
const nlohmann::json & getMapping() const
inja::Environment * injaEnvironment
static void getStaticMappings(const nlohmann::json &staticMapping, const iot::mqtt::packets::Publish &publish, MappedPublishes &mappedPublishes)
static const std::string & getSchema()
const nlohmann::json & getConnection() const
static void publishMappedMessage(const std::string &topic, const std::string &message, uint8_t qoS, bool retain, double delay, MappedPublishes &mappedPublishes)
void publishMappedTemplate(const nlohmann::json &templateMapping, nlohmann::json &json, MappedPublishes &mappedPublishes)
json validate(const json &, error_handler &, const json_uri &initial_uri=json_uri("#")) const
json validate(const json &) const
void default_string_format_check(const std::string &format, const std::string &value)
const SourceLocation location
const std::string message