2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
47#include <core/DynamicLoader.h>
48#include <iot/mqtt/Topic.h>
49#include <iot/mqtt/packets/Publish.h>
51#ifndef DOXYGEN_SHOULD_SKIP_THIS
54#pragma GCC diagnostic push
56#if __has_warning
("-Wcovered-switch-default")
57#pragma GCC diagnostic ignored "-Wcovered-switch-default"
59#if __has_warning
("-Wnrvo")
60#pragma GCC diagnostic ignored "-Wnrvo"
66#pragma GCC diagnostic pop
71#include <log/Logger.h>
73#include <nlohmann/json.hpp>
86 if (mappingJson.contains(
"plugins")) {
87 VLOG(1) <<
"Loading plugins ...";
89 for (
const nlohmann::json& pluginJson : mappingJson[
"plugins"]) {
90 const std::string plugin = pluginJson;
92 void* handle = core::DynamicLoader::dlOpen(plugin);
94 if (handle !=
nullptr) {
97 VLOG(1) <<
" Loading plugin: " << plugin <<
" ...";
99 const std::vector<mqtt::
lib::
Function>* loadedFunctions =
100 static_cast<std::vector<mqtt::
lib::
Function>*>(dlsym(handle,
"functions"));
101 if (loadedFunctions !=
nullptr) {
102 VLOG(1) <<
" Registering inja 'none void callbacks'";
103 for (
const mqtt::
lib::
Function& function : *loadedFunctions) {
104 VLOG(1) <<
" " << function
.name;
112 VLOG(1) <<
" Registering inja 'none void callbacks done'";
114 VLOG(1) <<
" No inja none 'void callbacks found' in plugin " << plugin;
118 static_cast<std::vector<mqtt::
lib::
VoidFunction>*>(dlsym(handle,
"voidFunctions"));
119 if (loadedVoidFunctions !=
nullptr) {
120 VLOG(1) <<
" Registering inja 'void callbacks'";
121 for (
const mqtt::
lib::
VoidFunction& voidFunction : *loadedVoidFunctions) {
122 VLOG(1) <<
" " << voidFunction
.name;
130 VLOG(1) <<
" Registering inja 'void callbacks' done";
132 VLOG(1) <<
" No inja 'void callbacks' found in plugin " << plugin;
135 VLOG(1) <<
" Loading plugin done: " << plugin;
137 VLOG(1) <<
" Error loading plugin: " << plugin;
141 VLOG(1) <<
"Loading plugins done";
149 core::DynamicLoader::dlClose(pluginHandle);
158 std::list<iot::
mqtt::Topic> topicList;
169 if (!matchingTopicLevel.empty()) {
170 const nlohmann::json& subscription = matchingTopicLevel[
"subscription"];
172 if (subscription.contains(
"static")) {
173 VLOG(1) <<
"Topic mapping found for:";
174 VLOG(1) <<
" Type: static";
175 VLOG(1) <<
" Topic: " << publish.getTopic();
176 VLOG(1) <<
" Message: " << publish.getMessage();
177 VLOG(1) <<
" QoS: " <<
static_cast<uint16_t>(publish.getQoS());
178 VLOG(1) <<
" Retain: " << publish.getRetain();
183 if (subscription.contains(
"value")) {
184 VLOG(1) <<
"Topic mapping found for:";
185 VLOG(1) <<
" Type: value";
186 VLOG(1) <<
" Topic: " << publish.getTopic();
187 VLOG(1) <<
" Message: " << publish.getMessage();
188 VLOG(1) <<
" QoS: " <<
static_cast<uint16_t>(publish.getQoS());
189 VLOG(1) <<
" Retain: " << publish.getRetain();
192 json[
"message"] = publish.getMessage();
197 if (subscription.contains(
"json")) {
198 VLOG(1) <<
"Topic mapping found for:";
199 VLOG(1) <<
" Type: json";
200 VLOG(1) <<
" Topic: " << publish.getTopic();
201 VLOG(1) <<
" Message: " << publish.getMessage();
202 VLOG(1) <<
" QoS: " <<
static_cast<uint16_t>(publish.getQoS());
203 VLOG(1) <<
" Retain: " << publish.getRetain();
207 json[
"message"] =
nlohmann::json::parse(publish.getMessage());
210 }
catch (
const nlohmann::json::parse_error& e) {
211 VLOG(1) <<
" Parsing message into json failed: " << publish.getMessage();
212 VLOG(1) <<
" What: " << e.what() <<
'\n'
213 <<
" Exception Id: " << e.id <<
'\n'
214 <<
" Byte position of error: " << e.byte;
222 const std::string& topic,
223 std::list<iot::
mqtt::Topic>& topicList) {
224 const std::string name = topicLevelJson[
"name"];
226 if (topicLevelJson.contains(
"subscription")) {
227 const uint8_t qoS = topicLevelJson[
"subscription"][
"qos"];
229 topicList.emplace_back(topic + ((topic.empty() || topic ==
"/") && !name.empty() ?
"" :
"/") + name, qoS);
232 if (topicLevelJson.contains(
"topic_level")) {
233 extractSubscriptions(topicLevelJson
, topic + ((topic.empty() || topic ==
"/") && !name.empty() ?
"" :
"/") + name
, topicList
);
239 const nlohmann::json& topicLevels = mappingJson[
"topic_level"];
241 if (topicLevels.is_object()) {
244 for (
const nlohmann::json& topicLevel : topicLevels) {
253 if (topicLevel.is_object()) {
254 const std::string::size_type slashPosition = topic.find(
'/');
255 const std::string topicLevelName = topic.substr(0, slashPosition);
257 if (topicLevel[
"name"] == topicLevelName || topicLevel[
"name"] ==
"+" || topicLevel[
"name"] ==
"#") {
258 if (slashPosition == std::string::npos) {
259 foundTopicLevel = topicLevel;
260 }
else if (topicLevel.contains(
"topic_level")) {
264 }
else if (topicLevel.is_array()) {
265 for (
const nlohmann::json& topicLevelEntry : topicLevel) {
268 if (!foundTopicLevel.empty()) {
274 return foundTopicLevel;
278 const std::string& mappingTemplate = templateMapping[
"mapping_template"];
279 const std::string& mappedTopic = templateMapping[
"mapped_topic"];
283 const std::string renderedTopic =
injaEnvironment->render(mappedTopic, json);
284 json[
"mapped_topic"] = renderedTopic;
286 VLOG(1) <<
" Mapped topic template: " << mappedTopic;
287 VLOG(1) <<
" -> " << renderedTopic;
291 const std::string renderedMessage =
injaEnvironment->render(mappingTemplate, json);
292 VLOG(1) <<
" Mapped message template: " << mappingTemplate;
293 VLOG(1) <<
" -> " << renderedMessage;
295 const nlohmann::json& suppressions = templateMapping[
"suppressions"];
296 const bool retain = templateMapping[
"retain"];
298 if (suppressions.empty() || std::find(suppressions.begin(), suppressions.end(), renderedMessage) == suppressions.end() ||
299 (retain && renderedMessage.empty())) {
300 const uint8_t qoS = templateMapping[
"qos"];
302 VLOG(1) <<
" Send mapping:";
303 VLOG(1) <<
" Topic: " << renderedTopic;
304 VLOG(1) <<
" Message: " << renderedMessage <<
"";
305 VLOG(1) <<
" QoS: " <<
static_cast<
int>(qoS);
306 VLOG(1) <<
" retain: " << retain;
310 VLOG(1) <<
" Rendered message: '" << renderedMessage <<
"' in suppression list:";
311 for (
const nlohmann::json& item : suppressions) {
312 VLOG(1) <<
" '" << item.get<std::string>() <<
"'";
314 VLOG(1) <<
" Send mapping: suppressed";
317 VLOG(1) <<
" Message template rendering failed: " << mappingTemplate <<
" : " << json.dump();
318 VLOG(1) <<
" What: " << e.what();
323 VLOG(1) <<
" Topic template rendering failed: " << mappingTemplate <<
" : " << json.dump();
324 VLOG(1) <<
" What: " << e.what();
333 json[
"topic"] = publish.getTopic();
334 json[
"qos"] = publish.getQoS();
335 json[
"retain"] = publish.getRetain();
336 json[
"package_identifier"] = publish.getPacketIdentifier();
339 VLOG(1) <<
" Render data: " << json.dump();
341 if (templateMapping.is_object()) {
344 for (
const nlohmann::json& concreteTemplateMapping : templateMapping) {
348 }
catch (
const nlohmann::json::exception& e) {
349 VLOG(1) <<
"JSON Exception during Render data:\n" << e.what();
354 VLOG(1) <<
" Mapped topic:";
355 VLOG(1) <<
" -> " << topic;
356 VLOG(1) <<
" Mapped message:";
357 VLOG(1) <<
" -> " << message;
358 VLOG(1) <<
" Send mapping:";
359 VLOG(1) <<
" Topic: " << topic;
360 VLOG(1) <<
" Message: " << message;
361 VLOG(1) <<
" QoS: " <<
static_cast<
int>(qoS);
362 VLOG(1) <<
" retain: " << retain;
368 const nlohmann::json& messageMapping = staticMapping[
"message_mapping"];
370 VLOG(1) <<
" Message mapping: " << messageMapping.dump();
372 if (messageMapping.is_object()) {
373 if (messageMapping[
"message"] == publish.getMessage()) {
375 staticMapping[
"mapped_topic"]
, messageMapping[
"mapped_message"]
, staticMapping[
"qos"]
, staticMapping[
"retain"]
);
377 VLOG(1) <<
" no matching mapped message found";
380 const nlohmann::json::const_iterator matchedMessageMappingIterator =
381 std::find_if(messageMapping.begin(), messageMapping.end(), [&publish](
const nlohmann::json& messageMappingCandidat) {
382 return messageMappingCandidat[
"message"] == publish.getMessage();
385 if (matchedMessageMappingIterator != messageMapping.end()) {
387 (*matchedMessageMappingIterator)[
"mapped_message"]
,
388 staticMapping[
"qos"]
,
389 staticMapping[
"retain"]
);
391 VLOG(1) <<
" no matching mapped message found";
397 if (staticMapping.is_object()) {
399 }
else if (staticMapping.is_array()) {
400 for (
const nlohmann::json& concreteStaticMapping : staticMapping) {
Class for changing the configuration.
void publishMappedTemplate(const nlohmann::json &templateMapping, nlohmann::json &json)
static void extractSubscription(const nlohmann::json &topicLevelJson, const std::string &topic, std::list< iot::mqtt::Topic > &topicList)
void publishMappedMessage(const nlohmann::json &staticMapping, const iot::mqtt::packets::Publish &publish)
void publishMappings(const iot::mqtt::packets::Publish &publish)
void publishMappedMessage(const std::string &topic, const std::string &message, uint8_t qoS, bool retain)
nlohmann::json findMatchingTopicLevel(const nlohmann::json &topicLevel, const std::string &topic)
void publishMappedMessages(const nlohmann::json &staticMapping, const iot::mqtt::packets::Publish &publish)
virtual void publishMapping(const std::string &topic, const std::string &message, uint8_t qoS, bool retain)=0
const nlohmann::json & mappingJson
std::list< void * > pluginHandles
void publishMappedTemplates(const nlohmann::json &templateMapping, nlohmann::json &json, const iot::mqtt::packets::Publish &publish)
inja::Environment * injaEnvironment
MqttMapper(const nlohmann::json &mappingJson)
std::list< iot::mqtt::Topic > extractSubscriptions()
static void extractSubscriptions(const nlohmann::json &mappingJson, const std::string &topic, std::list< iot::mqtt::Topic > &topicList)
const SourceLocation location
const std::string message