282 {
283 if (connack.getReturnCode() == 0) {
284 bool sendDisconnectFlag = true;
285
287 VLOG(0) << "MQTT Subscribe";
288
289 try {
290 std::list<iot::mqtt::Topic> topicList;
293 std::back_inserter(topicList),
294 [
qoSDefault = this->qoSDefault](
const std::string& compositTopic) -> iot::mqtt::Topic {
295 std::size_t pos = compositTopic.rfind("##");
296
297 const std::string topic = compositTopic.substr(0, pos);
298 uint8_t qoS = qoSDefault;
299
300 if (pos != std::string::npos) {
301 try {
302 qoS = getQos(compositTopic.substr(pos + 2));
303 } catch (const std::logic_error& error) {
304 VLOG(0) << "[" << Color::Code::FG_RED << "Error" << Color::Code::FG_DEFAULT
305 << "] Malformed composit topic: " << compositTopic << "\n"
306 << error.what();
307 throw;
308 }
309 }
310 VLOG(0) << " t: " << static_cast<int>(qoS) << " | " << topic;
311 return iot::mqtt::Topic(topic, qoS);
312 });
313 sendSubscribe(topicList);
314
315 sendDisconnectFlag = false;
316 } catch (const std::logic_error&) {
317 }
318 }
319
321 VLOG(0) << "MQTT Publish";
322
323 std::size_t pos =
pubTopic.rfind(
"##");
324
325 const std::string topic =
pubTopic.substr(0, pos);
326
328
329 try {
330 if (pos != std::string::npos) {
331 try {
333 } catch (const std::logic_error& error) {
334 VLOG(0) << "[" << Color::Code::FG_RED << "Error" << Color::Code::FG_DEFAULT
335 <<
"] Malformed composit topic: " <<
pubTopic <<
"\n"
336 << error.what();
337 throw;
338 }
339 }
341
342 sendDisconnectFlag = qoS > 0 ? false : sendDisconnectFlag;
343 } catch (const std::logic_error&) {
344 }
345 }
346 if (sendDisconnectFlag) {
347 sendDisconnect();
348 }
349 } else {
350 sendDisconnect();
351 }
352 }
static uint8_t getQos(const std::string &qoSString)