70 iot::mqtt::ControlPacketDeserializer* controlPacketDeserializer =
nullptr;
72 switch (fixedHeader.getType()) {
74 controlPacketDeserializer =
75 new iot::mqtt::server::packets::Connect(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
78 controlPacketDeserializer =
79 new iot::mqtt::server::packets::Publish(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
82 controlPacketDeserializer =
83 new iot::mqtt::server::packets::Puback(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
86 controlPacketDeserializer =
87 new iot::mqtt::server::packets::Pubrec(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
90 controlPacketDeserializer =
91 new iot::mqtt::server::packets::Pubrel(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
94 controlPacketDeserializer =
95 new iot::mqtt::server::packets::Pubcomp(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
98 controlPacketDeserializer =
99 new iot::mqtt::server::packets::Subscribe(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
102 controlPacketDeserializer =
103 new iot::mqtt::server::packets::Unsubscribe(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
106 controlPacketDeserializer =
107 new iot::mqtt::server::packets::Pingreq(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
110 controlPacketDeserializer =
111 new iot::mqtt::server::packets::Disconnect(fixedHeader.getRemainingLength(), fixedHeader.getFlags());
114 controlPacketDeserializer =
nullptr;
118 return controlPacketDeserializer;
125 bool Mqtt::initSession(
const utils::Timeval& keepAlive) {
128 if (broker->hasActiveSession(clientId)) {
129 LOG(ERROR) << connectionName <<
" MQTT Broker: Existing session found for ClientId = " << clientId;
130 LOG(ERROR) << connectionName <<
" MQTT Broker: closing";
135 }
else if (broker->hasRetainedSession(clientId)) {
136 LOG(INFO) << connectionName <<
" MQTT Broker: Retained session found for ClientId = " << clientId;
138 LOG(DEBUG) << connectionName <<
" New SessionId = " <<
this;
141 broker->unsubscribe(clientId);
142 initSession(broker->newSession(clientId,
this), keepAlive);
144 LOG(DEBUG) << connectionName <<
" Renew SessionId = " <<
this;
147 initSession(broker->renewSession(clientId,
this), keepAlive);
148 broker->restartSession(clientId);
151 LOG(INFO) << connectionName <<
" MQTT Broker: No session found for ClientId = " << clientId;
152 LOG(INFO) << connectionName <<
" MQTT Broker: new SessionId = " <<
this;
156 initSession(broker->newSession(clientId,
this), keepAlive);
163 if (broker->isActiveSession(clientId,
this)) {
165 LOG(DEBUG) << connectionName <<
" MQTT Broker: Delete session for ClientId = " << clientId;
166 LOG(DEBUG) << connectionName <<
" MQTT Broker: SessionId = " <<
this;
167 broker->deleteSession(clientId);
169 LOG(DEBUG) << connectionName <<
" MQTT Broker: Retain session for ClientId = " << clientId;
170 LOG(DEBUG) << connectionName <<
" MQTT Broker: SessionId = " <<
this;
171 broker->retainSession(clientId);
191 void Mqtt::_onConnect(
const iot::mqtt::server::
packets::Connect& connect) {
192 LOG(INFO) << connectionName <<
" MQTT Broker: Protocol: " << connect.getProtocol();
193 LOG(INFO) << connectionName <<
" MQTT Broker: Version: " <<
static_cast<uint16_t>(connect.getLevel());
194 LOG(INFO) << connectionName <<
" MQTT Broker: ConnectFlags: 0x" << std::hex << std::setfill(
'0') << std::setw(2)
195 <<
static_cast<uint16_t>(connect.getConnectFlags()) << std::dec << std::setw(0);
196 LOG(INFO) << connectionName <<
" MQTT Broker: KeepAlive: " << connect.getKeepAlive();
197 LOG(INFO) << connectionName <<
" MQTT Broker: ClientID: " << connect.getClientId();
198 LOG(INFO) << connectionName <<
" MQTT Broker: CleanSession: " << connect.getCleanSession();
200 if (connect.getWillFlag()) {
201 LOG(INFO) << connectionName <<
" MQTT Broker: WillTopic: " << connect.getWillTopic();
202 LOG(INFO) << connectionName <<
" MQTT Broker: WillMessage: " << connect.getWillMessage();
203 LOG(INFO) << connectionName <<
" MQTT Broker: WillQoS: " <<
static_cast<uint16_t>(connect.getWillQoS());
204 LOG(INFO) << connectionName <<
" MQTT Broker: WillRetain: " << connect.getWillRetain();
206 if (connect.getUsernameFlag()) {
207 LOG(INFO) << connectionName <<
" MQTT Broker: Username: " << connect.getUsername();
209 if (connect.getPasswordFlag()) {
210 LOG(INFO) << connectionName <<
" MQTT Broker: Password: " << connect.getPassword();
213 if (connect.getProtocol() !=
"MQTT") {
214 LOG(ERROR) << connectionName <<
" MQTT Broker: Wrong Protocol: " << connect.getProtocol();
215 mqttContext->end(
true);
217 LOG(ERROR) << connectionName <<
" MQTT Broker: Wrong Protocol Level: " <<
MQTT_VERSION_3_1_1 <<
" != " << connect.getLevel();
220 mqttContext->end(
true);
221 }
else if (connect.isFakedClientId() && !connect.getCleanSession()) {
222 LOG(ERROR) << connectionName <<
" MQTT Broker: Resume session but no ClientId present";
225 mqttContext->end(
true);
228 protocol = connect.getProtocol();
229 level = connect.getLevel();
230 reflect = connect.getReflect();
235 clientId = connect.getClientId();
236 willTopic = connect.getWillTopic();
237 willMessage = connect.getWillMessage();
238 username = connect.getUsername();
239 password = connect.getPassword();
245 willQoS = connect.getWillQoS();
252 mqttContext->end(
true);
265 void Mqtt::_onSubscribe(
const iot::mqtt::server::
packets::Subscribe& subscribe) {
266 if (subscribe.getPacketIdentifier() == 0) {
267 LOG(ERROR) << connectionName <<
" MQTT Broker: PackageIdentifier missing";
268 mqttContext->end(
true);
270 LOG(DEBUG) << connectionName <<
" MQTT Broker: PacketIdentifier: 0x" << std::hex << std::setfill(
'0') << std::setw(4)
271 << subscribe.getPacketIdentifier();
273 for (
const iot::mqtt::Topic& topic : subscribe.getTopics()) {
274 LOG(INFO) << connectionName <<
" MQTT Broker: Topic filter: '" << topic.getName()
275 <<
"', QoS: " <<
static_cast<uint16_t>(topic.getQoS());
278 std::list<uint8_t> returnCodes;
279 for (
const iot::mqtt::Topic& topic : subscribe.getTopics()) {
280 const uint8_t returnCode = broker->subscribe(clientId, topic.getName(), topic.getQoS());
281 returnCodes.push_back(returnCode);
284 sendSuback(subscribe.getPacketIdentifier(), returnCodes);
286 onSubscribe(subscribe);
290 void Mqtt::_onUnsubscribe(
const iot::mqtt::server::
packets::Unsubscribe& unsubscribe) {
291 if (unsubscribe.getPacketIdentifier() == 0) {
292 LOG(ERROR) << connectionName <<
" MQTT Broker: PackageIdentifier missing";
293 mqttContext->end(
true);
295 LOG(DEBUG) << connectionName <<
" MQTT Broker: PacketIdentifier: 0x" << std::hex << std::setfill(
'0') << std::setw(4)
296 << unsubscribe.getPacketIdentifier();
298 for (
const std::string& topic : unsubscribe.getTopics()) {
299 LOG(INFO) << connectionName <<
" MQTT Broker: Topic: " << topic;
302 for (
const std::string& topic : unsubscribe.getTopics()) {
303 broker->unsubscribe(clientId, topic);
308 onUnsubscribe(unsubscribe);