Skip to content

Commit 9fad312

Browse files
author
brentru
committed
add MQTTMatcher, expose MQTT's on_message, allow multiple callbacks
1 parent c546f2e commit 9fad312

File tree

4 files changed

+263
-3
lines changed

4 files changed

+263
-3
lines changed

adafruit_minimqtt/__init__.py

Whitespace-only changes.

adafruit_minimqtt.py renamed to adafruit_minimqtt/adafruit_minimqtt.py

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import time
4444
from random import randint
4545
from micropython import const
46+
from .matcher import MQTTMatcher
4647
import adafruit_logging as logging
4748

4849
__version__ = "0.0.0-auto.0"
@@ -173,8 +174,9 @@ def __init__(
173174
self._lw_retain = False
174175
# List of subscribed topics, used for tracking
175176
self._subscribed_topics = []
177+
self._on_message_filtered = MQTTMatcher()
176178
# Server callbacks
177-
self.on_message = None
179+
self._on_message = None
178180
self.on_connect = None
179181
self.on_disconnect = None
180182
self.on_publish = None
@@ -218,6 +220,49 @@ def will_set(self, topic=None, payload=None, qos=0, retain=False):
218220
self._lw_msg = payload
219221
self._lw_retain = retain
220222

223+
def add_topic_callback(self, mqtt_topic, callback_method):
224+
"""Registers a callback_method for a specific MQTT topic.
225+
:param str mqtt_topic: MQTT topic.
226+
:param str callback_method: Name of callback method.
227+
228+
"""
229+
print("adding topic callback...")
230+
if mqtt_topic is None or callback_method is None:
231+
raise ValueError("MQTT topic and callback method must both be defined.")
232+
self._on_message_filtered[mqtt_topic] = callback_method
233+
234+
def remove_topic_callback(self, mqtt_topic):
235+
"""Removes a registered callback method.
236+
:param str mqtt_topic: MQTT topic.
237+
238+
"""
239+
if mqtt_topic is None:
240+
raise ValueError("MQTT Topic must be defined.")
241+
pass
242+
243+
@property
244+
def on_message(self):
245+
"""Called when a new message has been received on a subscribed topic.
246+
247+
Expected method signature is:
248+
on_message(client, topic, message)
249+
"""
250+
return self._on_message
251+
252+
@on_message.setter
253+
def on_message(self, method):
254+
self._on_message = method
255+
256+
def _handle_on_message(self, client, topic, message):
257+
matched = False
258+
if topic is not None:
259+
for callback in self._on_message_filtered.iter_match(topic):
260+
callback(client, topic, message) # on_msg with callback
261+
matched = True
262+
263+
if matched == False and self.on_message: # regular on_message
264+
self.on_message(client, topic, message)
265+
221266
# pylint: disable=too-many-branches, too-many-statements, too-many-locals
222267
def connect(self, clean_session=True):
223268
"""Initiates connection with the MQTT Broker.
@@ -666,7 +711,7 @@ def _wait_for_msg(self, timeout=30):
666711
sz -= 0x02
667712
msg = self._sock.recv(sz)
668713
if self.on_message is not None:
669-
self.on_message(self, topic, str(msg, "utf-8"))
714+
self._handle_on_message(self, topic, str(msg, "utf-8"))
670715
if res[0] & 0x06 == 0x02:
671716
pkt = bytearray(b"\x40\x02\0\0")
672717
struct.pack_into("!H", pkt, 2, pid)
@@ -751,7 +796,7 @@ def mqtt_msg(self, msg_size):
751796
if msg_size < MQTT_MSG_MAX_SZ:
752797
self._msg_size_lim = msg_size
753798

754-
# Logging
799+
### Logging ###
755800
def attach_logger(self, logger_name="log"):
756801
"""Initializes and attaches a logger to the MQTTClient.
757802
:param str logger_name: Name of the logger instance

adafruit_minimqtt/matcher.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
class MQTTMatcher(object):
2+
"""Intended to manage topic filters including wildcards.
3+
4+
Internally, MQTTMatcher use a prefix tree (trie) to store
5+
values associated with filters, and has an iter_match()
6+
method to iterate efficiently over all filters that match
7+
some topic name."""
8+
9+
class Node(object):
10+
__slots__ = "_children", "_content"
11+
12+
def __init__(self):
13+
self._children = {}
14+
self._content = None
15+
16+
def __init__(self):
17+
self._root = self.Node()
18+
19+
def __setitem__(self, key, value):
20+
"""Add a topic filter :key to the prefix tree
21+
and associate it to :value"""
22+
node = self._root
23+
for sym in key.split("/"):
24+
node = node._children.setdefault(sym, self.Node())
25+
node._content = value
26+
27+
def __getitem__(self, key):
28+
"""Retrieve the value associated with some topic filter :key"""
29+
try:
30+
node = self._root
31+
for sym in key.split("/"):
32+
node = node._children[sym]
33+
if node._content is None:
34+
raise KeyError(key)
35+
return node._content
36+
except KeyError:
37+
raise KeyError(key)
38+
39+
def __delitem__(self, key):
40+
"""Delete the value associated with some topic filter :key"""
41+
lst = []
42+
try:
43+
parent, node = None, self._root
44+
for k in key.split("/"):
45+
parent, node = node, node._children[k]
46+
lst.append((parent, k, node))
47+
# TODO
48+
node._content = None
49+
except KeyError:
50+
raise KeyError(key)
51+
else: # cleanup
52+
for parent, k, node in reversed(lst):
53+
if node._children or node._content is not None:
54+
break
55+
del parent._children[k]
56+
57+
def iter_match(self, topic):
58+
"""Return an iterator on all values associated with filters
59+
that match the :topic"""
60+
lst = topic.split("/")
61+
normal = not topic.startswith("$")
62+
63+
def rec(node, i=0):
64+
if i == len(lst):
65+
if node._content is not None:
66+
yield node._content
67+
else:
68+
part = lst[i]
69+
if part in node._children:
70+
for content in rec(node._children[part], i + 1):
71+
yield content
72+
if "+" in node._children and (normal or i > 0):
73+
for content in rec(node._children["+"], i + 1):
74+
yield content
75+
if "#" in node._children and (normal or i > 0):
76+
content = node._children["#"]._content
77+
if content is not None:
78+
yield content
79+
80+
return rec(self._root)
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import time
2+
import board
3+
import busio
4+
from digitalio import DigitalInOut
5+
import neopixel
6+
from adafruit_esp32spi import adafruit_esp32spi
7+
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
8+
import adafruit_esp32spi.adafruit_esp32spi_socket as socket
9+
import adafruit_minimqtt.adafruit_minimqtt as MQTT
10+
11+
### WiFi ###
12+
13+
# Get wifi details and more from a secrets.py file
14+
try:
15+
from secrets import secrets
16+
except ImportError:
17+
print("WiFi secrets are kept in secrets.py, please add them there!")
18+
raise
19+
20+
esp32_cs = DigitalInOut(board.D13)
21+
esp32_ready = DigitalInOut(board.D11)
22+
esp32_reset = DigitalInOut(board.D12)
23+
24+
spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
25+
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
26+
"""Use below for Most Boards"""
27+
status_light = neopixel.NeoPixel(
28+
board.NEOPIXEL, 1, brightness=0.2
29+
) # Uncomment for Most Boards
30+
"""Uncomment below for ItsyBitsy M4"""
31+
# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
32+
# Uncomment below for an externally defined RGB LED
33+
# import adafruit_rgbled
34+
# from adafruit_esp32spi import PWMOut
35+
# RED_LED = PWMOut.PWMOut(esp, 26)
36+
# GREEN_LED = PWMOut.PWMOut(esp, 27)
37+
# BLUE_LED = PWMOut.PWMOut(esp, 25)
38+
# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
39+
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
40+
41+
### Topic Setup ###
42+
43+
# MQTT Topic
44+
# Use this topic if you'd like to connect to a standard MQTT broker
45+
mqtt_topic = "test/topic"
46+
47+
# Adafruit IO-style Topic
48+
# Use this topic if you'd like to connect to io.adafruit.com
49+
# mqtt_topic = 'aio_user/feeds/temperature'
50+
51+
### Code ###
52+
53+
# Define callback methods which are called when events occur
54+
# pylint: disable=unused-argument, redefined-outer-name
55+
def connect(client, userdata, flags, rc):
56+
# This function will be called when the client is connected
57+
# successfully to the broker.
58+
print("Connected to MQTT Broker!")
59+
print("Flags: {0}\n RC: {1}".format(flags, rc))
60+
61+
62+
def disconnect(client, userdata, rc):
63+
# This method is called when the client disconnects
64+
# from the broker.
65+
print("Disconnected from MQTT Broker!")
66+
67+
68+
def subscribe(client, userdata, topic, granted_qos):
69+
# This method is called when the client subscribes to a new feed.
70+
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
71+
72+
73+
def unsubscribe(client, userdata, topic, pid):
74+
# This method is called when the client unsubscribes from a feed.
75+
print("Unsubscribed from {0} with PID {1}".format(topic, pid))
76+
77+
78+
def publish(client, userdata, topic, pid):
79+
# This method is called when the client publishes data to a feed.
80+
print("Published to {0} with PID {1}".format(topic, pid))
81+
82+
def on_battery_msg(client, topic, message):
83+
print("Battery Level: {}v".format(message))
84+
85+
def on_message(client, topic, message):
86+
"""Method callled when a client's subscribed feed has a new
87+
value.
88+
:param str topic: The topic of the feed with a new value.
89+
:param str message: The new value
90+
"""
91+
print("New message on topic {0}: {1}".format(topic, message))
92+
93+
94+
# Connect to WiFi
95+
print("Connecting to WiFi...")
96+
wifi.connect()
97+
print("Connected!")
98+
99+
# Initialize MQTT interface with the esp interface
100+
MQTT.set_socket(socket, esp)
101+
102+
# Set up a MiniMQTT Client
103+
client = MQTT.MQTT(broker=secrets["broker"], port=1883)
104+
105+
# Connect callback handlers to client
106+
client.on_connect = connect
107+
client.on_disconnect = disconnect
108+
client.on_subscribe = subscribe
109+
client.on_unsubscribe = unsubscribe
110+
client.on_publish = publish
111+
112+
client.on_message = on_message
113+
client.add_topic_callback("sensors/batteryLevel", on_battery_msg)
114+
115+
# Connect the client to the MQTT broker.
116+
print("Connecting to MQTT broker...")
117+
client.connect()
118+
119+
# Subscribe to all notifications on the sensors/
120+
client.subscribe("sensors/#", 1)
121+
122+
# Start a blocking message loop...
123+
# NOTE: NO code below this loop will execute
124+
# NOTE: Network reconnection is handled within this loop
125+
while True:
126+
try:
127+
client.loop()
128+
except (ValueError, RuntimeError) as e:
129+
print("Failed to get data, retrying\n", e)
130+
wifi.reset()
131+
client.reconnect()
132+
continue
133+
time.sleep(1)
134+
135+

0 commit comments

Comments
 (0)