SNode.C
Loading...
Searching...
No Matches
SubProtocol.hpp
Go to the documentation of this file.
1/*
2 * SNode.C - a slim toolkit for network communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "core/socket/stream/SocketConnection.h"
21#include "iot/mqtt/SubProtocol.h"
22#include "log/Logger.h"
23#include "utils/system/signal.h"
24#include "web/websocket/SubProtocolContext.h"
25
26#ifndef DOXYGEN_SHOULD_SKIP_THIS
27
28#include "utils/hexdump.h"
29
30#include <algorithm>
31
32#endif // DOXYGEN_SHOULD_SKIP_THIS
33
34namespace iot::mqtt {
35
36 template <typename WSSubProtocolRole>
37 SubProtocol<WSSubProtocolRole>::SubProtocol(web::websocket::SubProtocolContext* subProtocolContext,
38 const std::string& name,
39 iot::mqtt::Mqtt* mqtt)
40 : WSSubProtocolRole(subProtocolContext, name, 0)
41 , iot::mqtt::MqttContext(mqtt)
42 , onReceivedFromPeerEvent([this]([[maybe_unused]] const utils::Timeval& currentTime) {
43 iot::mqtt::MqttContext::onReceivedFromPeer();
44
45 if (size > 0) {
46 onReceivedFromPeerEvent.span();
47 } else {
48 buffer.clear();
49 cursor = 0;
50 }
51 }) {
52 }
53
54 template <typename WSSubProtocolRole>
55 std::size_t SubProtocol<WSSubProtocolRole>::recv(char* chunk, std::size_t chunklen) {
56 std::size_t maxReturn = std::min(chunklen, size);
57
58 std::copy(buffer.data() + cursor, buffer.data() + cursor + maxReturn, chunk);
59
60 cursor += maxReturn;
61 size -= maxReturn;
62
63 return maxReturn;
64 }
65
66 template <typename WSSubProtocolRole>
67 void SubProtocol<WSSubProtocolRole>::send(const char* chunk, std::size_t chunklen) {
68 WSSubProtocolRole::sendMessage(chunk, chunklen);
69 }
70
71 template <typename WSSubProtocolRole>
72 void SubProtocol<WSSubProtocolRole>::end([[maybe_unused]] bool fatal) {
73 WSSubProtocolRole::sendClose();
74 }
75
76 template <typename WSSubProtocolRole>
77 void SubProtocol<WSSubProtocolRole>::close() {
78 WSSubProtocolRole::sendClose();
79 }
80
81 template <typename WSSubProtocolRole>
82 void SubProtocol<WSSubProtocolRole>::onConnected() {
83 LOG(INFO) << getSocketConnection()->getConnectionName() << " WSMQTT: connected:";
84 iot::mqtt::MqttContext::onConnected();
85 }
86
87 template <typename WSSubProtocolRole>
88 void SubProtocol<WSSubProtocolRole>::onMessageStart(int opCode) {
89 if (opCode == web::websocket::SubProtocolContext::OpCode::TEXT) {
90 LOG(ERROR) << getSocketConnection()->getConnectionName() << " WSMQTT: Wrong Opcode: " << opCode;
91 this->end(true);
92 } else {
93 LOG(DEBUG) << getSocketConnection()->getConnectionName() << " WSMQTT: Message START: " << opCode;
94 }
95 }
96
97 template <typename WSSubProtocolRole>
98 void SubProtocol<WSSubProtocolRole>::onMessageData(const char* chunk, std::size_t chunkLen) {
99 data.append(std::string(chunk, chunkLen));
100
101 LOG(DEBUG) << getSocketConnection()->getConnectionName() << " WebSocket: Frame Data:\n"
102 << std::string(32, ' ').append(utils::hexDump(std::vector<char>(chunk, chunk + chunkLen), 32));
103 }
104
105 template <typename WSSubProtocolRole>
106 void SubProtocol<WSSubProtocolRole>::onMessageEnd() {
107 LOG(DEBUG) << getSocketConnection()->getConnectionName() << " WSMQTT: Message END";
108
109 buffer.insert(buffer.end(), data.begin(), data.end());
110 size += data.size();
111 data.clear();
112
113 iot::mqtt::MqttContext::onReceivedFromPeer();
114
115 if (size > 0) {
116 onReceivedFromPeerEvent.span();
117 } else {
118 buffer.clear();
119 cursor = 0;
120 }
121 }
122
123 template <typename WSSubProtocolRole>
124 void SubProtocol<WSSubProtocolRole>::onMessageError(uint16_t errnum) {
125 LOG(ERROR) << getSocketConnection()->getConnectionName() << " WSMQTT: Message error: " << errnum;
126 }
127
128 template <typename WSSubProtocolRole>
129 void SubProtocol<WSSubProtocolRole>::onDisconnected() {
130 iot::mqtt::MqttContext::onDisconnected();
131 LOG(DEBUG) << getSocketConnection()->getConnectionName() << " WSMQTT: disconnected:";
132 }
133
134 template <typename WSSubProtocolRole>
135 bool SubProtocol<WSSubProtocolRole>::onSignal(int sig) {
136 bool ret = iot::mqtt::MqttContext::onSignal(sig);
137 LOG(INFO) << getSocketConnection()->getConnectionName() << " WSMQTT: exit due to '" << strsignal(sig) << "' (SIG"
138 << utils::system::sigabbrev_np(sig) << " = " << sig << ")";
139
140 this->sendClose();
141
142 return ret;
143 }
144
145 template <typename WSSubProtocolRole>
146 core::socket::stream::SocketConnection* SubProtocol<WSSubProtocolRole>::getSocketConnection() const {
147 return WSSubProtocolRole::subProtocolContext->getSocketConnection();
148 }
149
150} // namespace iot::mqtt
void onEvent(const utils::Timeval &currentTime) override
OnReceivedFromPeerEvent(const std::function< void(const utils::Timeval &)> &onReceivedFromPeer)
std::function< void(const utils::Timeval &)> onReceivedFromPeer
Definition SubProtocol.h:57
void onMessageError(uint16_t errnum) override
bool onSignal(int sig) override
void send(const char *chunk, std::size_t chunklen) override
void onDisconnected() override
void onConnected() override
void close() override
void end(bool fatal=false) override
core::socket::stream::SocketConnection * getSocketConnection() const override
std::size_t recv(char *chunk, std::size_t chunklen) override
std::vector< char > buffer
Definition SubProtocol.h:91
OnReceivedFromPeerEvent onReceivedFromPeerEvent
Definition SubProtocol.h:88
void onMessageData(const char *chunk, std::size_t chunkLen) override
SubProtocol(web::websocket::SubProtocolContext *subProtocolContext, const std::string &name, iot::mqtt::Mqtt *mqtt)
void onMessageEnd() override
void onMessageStart(int opCode) override
~SubProtocol() override=default