diff --git a/examples/FullyFeatured-ESP32/FullyFeatured-ESP32.ino b/examples/FullyFeatured-ESP32/FullyFeatured-ESP32.ino index c564640..15c456d 100644 --- a/examples/FullyFeatured-ESP32/FullyFeatured-ESP32.ino +++ b/examples/FullyFeatured-ESP32/FullyFeatured-ESP32.ino @@ -62,6 +62,16 @@ void onMqttConnect(bool sessionPresent) { uint16_t packetIdPub2 = mqttClient.publish("test/lol", 2, true, "test 3"); Serial.print("Publishing at QoS 2, packetId: "); Serial.println(packetIdPub2); + + packetIdSub = mqttClient.subscribe("Topic3/Example/Test", 2, onSpecificMqttMessage); + Serial.print("Subscribing at QoS 2 with specified callback, packetId: "); + Serial.println(packetIdSub); + mqttClient.publish("Topic1/Example/Test", 0, true, "Test"); + Serial.println("Publishing \"Test\" to \"Topic1/Example/Test\" at QoS 0"); + mqttClient.publish("Topic2/Example/Test", 0, true, "Test"); + Serial.println("Publishing \"Test\" to \"Topic2/Example/Test\" at QoS 0"); + mqttClient.publish("Topic3/Example/Test", 0, true, "Test"); + Serial.println("Publishing \"Test\" to \"Topic3/Example/Test\" at QoS 0"); } void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) { @@ -104,6 +114,23 @@ void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties Serial.println(total); } +void onSpecificMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { + Serial.print("Publish received for specific topic: "); + Serial.println(topic); + Serial.print(" qos: "); + Serial.println(properties.qos); + Serial.print(" dup: "); + Serial.println(properties.dup); + Serial.print(" retain: "); + Serial.println(properties.retain); + Serial.print(" len: "); + Serial.println(len); + Serial.print(" index: "); + Serial.println(index); + Serial.print(" total: "); + Serial.println(total); +} + void onMqttPublish(uint16_t packetId) { Serial.println("Publish acknowledged."); Serial.print(" packetId: "); @@ -125,11 +152,12 @@ void setup() { mqttClient.onSubscribe(onMqttSubscribe); mqttClient.onUnsubscribe(onMqttUnsubscribe); mqttClient.onMessage(onMqttMessage); + mqttClient.onMessage(onSpecificMqttMessage, "Topic1/#"); // Optional - Overloaded example + mqttClient.onFilteredMessage(onSpecificMqttMessage, "Topic2/+/Test"); // Optional - Method for verbose reading mqttClient.onPublish(onMqttPublish); mqttClient.setServer(MQTT_HOST, MQTT_PORT); connectToWifi(); } -void loop() { -} +void loop() {} diff --git a/examples/FullyFeatured-ESP8266/FullyFeatured-ESP8266.ino b/examples/FullyFeatured-ESP8266/FullyFeatured-ESP8266.ino index 82f981d..4fd193f 100644 --- a/examples/FullyFeatured-ESP8266/FullyFeatured-ESP8266.ino +++ b/examples/FullyFeatured-ESP8266/FullyFeatured-ESP8266.ino @@ -40,6 +40,7 @@ void onMqttConnect(bool sessionPresent) { Serial.println("Connected to MQTT."); Serial.print("Session present: "); Serial.println(sessionPresent); + uint16_t packetIdSub = mqttClient.subscribe("test/lol", 2); Serial.print("Subscribing at QoS 2, packetId: "); Serial.println(packetIdSub); @@ -51,6 +52,16 @@ void onMqttConnect(bool sessionPresent) { uint16_t packetIdPub2 = mqttClient.publish("test/lol", 2, true, "test 3"); Serial.print("Publishing at QoS 2, packetId: "); Serial.println(packetIdPub2); + + packetIdSub = mqttClient.subscribe("Topic3/Example/Test", 2, onSpecificMqttMessage); + Serial.print("Subscribing at QoS 2 with specified callback, packetId: "); + Serial.println(packetIdSub); + mqttClient.publish("Topic1/Example/Test", 0, true, "Test"); + Serial.println("Publishing \"Test\" to \"Topic1/Example/Test\" at QoS 0"); + mqttClient.publish("Topic2/Example/Test", 0, true, "Test"); + Serial.println("Publishing \"Test\" to \"Topic2/Example/Test\" at QoS 0"); + mqttClient.publish("Topic3/Example/Test", 0, true, "Test"); + Serial.println("Publishing \"Test\" to \"Topic3/Example/Test\" at QoS 0"); } void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) { @@ -93,6 +104,23 @@ void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties Serial.println(total); } +void onSpecificMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { + Serial.print("Publish received for specific topic: "); + Serial.println(topic); + Serial.print(" qos: "); + Serial.println(properties.qos); + Serial.print(" dup: "); + Serial.println(properties.dup); + Serial.print(" retain: "); + Serial.println(properties.retain); + Serial.print(" len: "); + Serial.println(len); + Serial.print(" index: "); + Serial.println(index); + Serial.print(" total: "); + Serial.println(total); +} + void onMqttPublish(uint16_t packetId) { Serial.println("Publish acknowledged."); Serial.print(" packetId: "); @@ -112,11 +140,12 @@ void setup() { mqttClient.onSubscribe(onMqttSubscribe); mqttClient.onUnsubscribe(onMqttUnsubscribe); mqttClient.onMessage(onMqttMessage); + mqttClient.onMessage(onSpecificMqttMessage, "Topic1/#"); // Optional - Overloaded example + mqttClient.onFilteredMessage(onSpecificMqttMessage, "Topic2/+/Test"); // Optional - Method for verbose reading mqttClient.onPublish(onMqttPublish); mqttClient.setServer(MQTT_HOST, MQTT_PORT); connectToWifi(); } -void loop() { -} +void loop() {} \ No newline at end of file diff --git a/keywords.txt b/keywords.txt index 98c4bf3..956a6ac 100644 --- a/keywords.txt +++ b/keywords.txt @@ -25,6 +25,7 @@ onDisconnect KEYWORD2 onSubscribe KEYWORD2 onUnsubscribe KEYWORD2 onMessage KEYWORD2 +onFilteredMessage KEYWORD2 onPublish KEYWORD2 connected KEYWORD2 diff --git a/src/AsyncMqttClient.cpp b/src/AsyncMqttClient.cpp index f62e1ef..50d5102 100644 --- a/src/AsyncMqttClient.cpp +++ b/src/AsyncMqttClient.cpp @@ -50,6 +50,9 @@ AsyncMqttClient::AsyncMqttClient() AsyncMqttClient::~AsyncMqttClient() { delete _currentParsedPacket; delete[] _parsingInformation.topicBuffer; + for (auto callback : _onMessageUserCallbacks) { + delete callback.first; + } #ifdef ESP32 vSemaphoreDelete(_xSemaphore); #endif @@ -140,8 +143,14 @@ AsyncMqttClient& AsyncMqttClient::onUnsubscribe(AsyncMqttClientInternals::OnUnsu return *this; } -AsyncMqttClient& AsyncMqttClient::onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback) { - _onMessageUserCallbacks.push_back(callback); +AsyncMqttClient& AsyncMqttClient::onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char* _userTopic) { + onFilteredMessage(callback, _userTopic); + return *this; +} + +AsyncMqttClient& AsyncMqttClient::onFilteredMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char* _userTopic) { + // _onMessageUserCallbacks.push_back(AsyncMqttClientInternals::onFilteredMessageUserCallback(_userTopic, callback)); + _onMessageUserCallbacks.push_back(AsyncMqttClientInternals::onFilteredMessageUserCallback(strdup(_userTopic), callback)); // leakage issue return *this; } @@ -520,7 +529,47 @@ void AsyncMqttClient::_onMessage(char* topic, char* payload, uint8_t qos, bool d properties.dup = dup; properties.retain = retain; - for (auto callback : _onMessageUserCallbacks) callback(topic, payload, properties, len, index, total); + for (auto callback : _onMessageUserCallbacks) { + bool mqttTopicMatch = false; + + if (strcmp(callback.first, "#") == 0 || strcmp(callback.first, topic) == 0) { + mqttTopicMatch = true; + } + else { + char* messageTopic = strdup(topic); + char* userTopic = strdup(callback.first); + char* messageSubTopic = strtok_r (messageTopic, "/", &messageTopic); + char* userSubTopic = strtok_r (userTopic, "/", &userTopic); + + while (messageSubTopic != NULL || userSubTopic != NULL) { + if (messageSubTopic != NULL && userSubTopic == NULL) { + mqttTopicMatch = false; + break; + } + else if (messageSubTopic == NULL && userSubTopic != NULL) { + mqttTopicMatch = false; + break; + } + else if (mqttTopicMatch && strcmp(userSubTopic, "#") == 0) { + mqttTopicMatch = true; + break; + } + else if (strcmp(messageSubTopic, userSubTopic) == 0 || strcmp(userSubTopic, "+") == 0) { + messageSubTopic = strtok_r (messageTopic, "/", &messageTopic); + userSubTopic = strtok_r (userTopic, "/", &userTopic); + mqttTopicMatch = true; + } + else { + mqttTopicMatch = false; + break; + } + } + } + + if (mqttTopicMatch) { + callback.second(topic, payload, properties, len, index, total); + } + } } } @@ -765,6 +814,11 @@ uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) { return packetId; } +uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos, AsyncMqttClientInternals::OnMessageUserCallback callback) { + onFilteredMessage(callback, topic); + subscribe(topic, qos); +} + uint16_t AsyncMqttClient::unsubscribe(const char* topic) { if (!_connected) return 0; diff --git a/src/AsyncMqttClient.hpp b/src/AsyncMqttClient.hpp index af8332b..57467f4 100644 --- a/src/AsyncMqttClient.hpp +++ b/src/AsyncMqttClient.hpp @@ -68,13 +68,15 @@ class AsyncMqttClient { AsyncMqttClient& onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback); AsyncMqttClient& onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback); AsyncMqttClient& onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback); - AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback); + AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char* _userTopic = "#"); + AsyncMqttClient& onFilteredMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char* _userTopic); AsyncMqttClient& onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback); bool connected() const; void connect(); void disconnect(bool force = false); uint16_t subscribe(const char* topic, uint8_t qos); + uint16_t subscribe(const char* topic, uint8_t qos, AsyncMqttClientInternals::OnMessageUserCallback callback); uint16_t unsubscribe(const char* topic); uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0); @@ -116,7 +118,7 @@ class AsyncMqttClient { std::vector _onDisconnectUserCallbacks; std::vector _onSubscribeUserCallbacks; std::vector _onUnsubscribeUserCallbacks; - std::vector _onMessageUserCallbacks; + std::vector _onMessageUserCallbacks; std::vector _onPublishUserCallbacks; AsyncMqttClientInternals::ParsingInformation _parsingInformation; diff --git a/src/AsyncMqttClient/Callbacks.hpp b/src/AsyncMqttClient/Callbacks.hpp index 2a4a09f..b0d7222 100644 --- a/src/AsyncMqttClient/Callbacks.hpp +++ b/src/AsyncMqttClient/Callbacks.hpp @@ -12,6 +12,8 @@ typedef std::function OnDisconnect typedef std::function OnSubscribeUserCallback; typedef std::function OnUnsubscribeUserCallback; typedef std::function OnMessageUserCallback; +// typedef std::pair> onFilteredMessageUserCallback; +typedef std::pair> onFilteredMessageUserCallback; typedef std::function OnPublishUserCallback; // internal callbacks