Skip to content

Commit b6e2021

Browse files
authored
Merge pull request #1555 from aikitori/mqtt-monitor
Mqtt monitor
2 parents d38825f + af1dc0a commit b6e2021

File tree

3 files changed

+241
-0
lines changed

3 files changed

+241
-0
lines changed

docs/monitors/mqtt_client.rst

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
mqtt_client - monitor a mqtt topic
2+
^^^^^^^^^^^^^^^^^^
3+
4+
Subscipe to a MQTT topic and compare the payload with a success state
5+
6+
.. confval:: broker
7+
8+
:type: string
9+
:required: true
10+
11+
the hostname or IP of the broker
12+
13+
.. confval:: port
14+
15+
:type: int
16+
:required: false
17+
:default: ``1883``
18+
19+
The port of the broker
20+
21+
.. confval:: username
22+
23+
:type: string
24+
:required: false
25+
:default: ````
26+
27+
The mqtt username
28+
29+
.. confval:: password
30+
31+
:type: string
32+
:required: false
33+
:default: ````
34+
35+
The mqtt password
36+
37+
.. confval:: tls
38+
39+
:type: bool
40+
:required: false
41+
:default: ``false``
42+
43+
Use tls
44+
45+
.. confval:: ca_cert
46+
47+
:type: string
48+
:required: false
49+
:default: ````
50+
51+
Path to the CA cert. Otherwise, use the system CAs
52+
53+
.. confval:: topic
54+
55+
:type: string
56+
:required: true
57+
:default: ````
58+
59+
The topic which simplemonitor will subscribe to
60+
61+
.. confval:: success_state
62+
63+
:type: string
64+
:required: true
65+
:default: ````
66+
67+
The success state of the payload. Can be a Number, a string or a comparison (e.g. <10,>10,0<x<10)

simplemonitor/Monitors/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
MonitorZap,
2222
MonitorZpool,
2323
)
24+
from .mqtt import MonitorMQTT
2425
from .network import (
2526
MonitorDNS,
2627
MonitorHost,
@@ -43,6 +44,7 @@
4344
from .unifi import MonitorUnifiFailover, MonitorUnifiFailoverWatchdog
4445

4546
__all__ = [
47+
"MonitorMQTT",
4648
"CompoundMonitor",
4749
"MonitorApcupsd",
4850
"MonitorArloCamera",

simplemonitor/Monitors/mqtt.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import threading
2+
3+
import paho.mqtt.client as mqtt
4+
5+
from .monitor import Monitor, register
6+
7+
8+
class MQTTBrokerManager:
9+
"""Manager for MQTT connections and subscriptions"""
10+
11+
_managers: dict[
12+
tuple[str, int], "MQTTBrokerManager"
13+
] = {} # Shared dictionary of brokers: {(broker, port): MQTTBrokerManager}
14+
15+
def __new__(
16+
cls, broker, port, username=None, password=None, tls_enabled=False, ca_cert=None
17+
):
18+
key = (broker, port) # Unique key based on broker and port
19+
if key not in cls._managers:
20+
# Create a new instance if it doesn't exist for this broker
21+
instance = super().__new__(cls)
22+
instance._init(broker, port, username, password, tls_enabled, ca_cert)
23+
cls._managers[key] = instance
24+
return cls._managers[key]
25+
26+
def _init(self, broker, port, username, password, tls_enabled, ca_cert):
27+
"""Initialize the MQTT client."""
28+
self.client = mqtt.Client()
29+
self.lock = threading.Lock()
30+
self.topic_callbacks = {} # Map of topic -> list of callback functions
31+
self.received_data = {} # Latest payload for each topic
32+
33+
# Authentication
34+
if username and password:
35+
self.client.username_pw_set(username, password)
36+
37+
# TLS configuration
38+
if tls_enabled:
39+
if ca_cert:
40+
self.client.tls_set(ca_certs=ca_cert)
41+
print(f"TLS enabled for {broker} with CA certificate: {ca_cert}")
42+
else:
43+
self.client.tls_set()
44+
print(f"TLS enabled for {broker} with default CA certificates.")
45+
self.client.tls_insecure_set(False)
46+
47+
# MQTT event handlers
48+
self.client.on_connect = self.on_connect
49+
self.client.on_message = self.on_message
50+
self.client.on_disconnect = self.on_disconnect
51+
52+
# Start MQTT client in a background thread
53+
self.thread = threading.Thread(target=self.start_client, args=(broker, port))
54+
self.thread.daemon = True
55+
self.thread.start()
56+
57+
def on_connect(self, client, userdata, flags, rc):
58+
"""Callback when connected to the broker."""
59+
if rc == 0:
60+
print("Connected to MQTT broker.")
61+
for topic in self.topic_callbacks:
62+
client.subscribe(topic)
63+
print(f"Subscribed to topic: {topic}")
64+
else:
65+
print(f"Connection failed with code {rc}")
66+
67+
def on_message(self, client, userdata, msg):
68+
"""Callback when a message is received."""
69+
topic = msg.topic
70+
payload = msg.payload.decode()
71+
72+
with self.lock:
73+
self.received_data[topic] = payload
74+
if topic in self.topic_callbacks:
75+
for callback in self.topic_callbacks[topic]:
76+
callback(payload)
77+
78+
def on_disconnect(self, client, userdata, rc):
79+
"""Callback when disconnected from the broker."""
80+
print("Disconnected from MQTT broker.")
81+
82+
def start_client(self, broker, port):
83+
"""Start the MQTT client loop."""
84+
try:
85+
self.client.connect(broker, port)
86+
self.client.loop_forever()
87+
except Exception as e:
88+
print(f"Error starting MQTT client: {e}")
89+
90+
def subscribe(self, topic, callback):
91+
"""Subscribe to a topic and register a callback."""
92+
with self.lock:
93+
if topic not in self.topic_callbacks:
94+
self.topic_callbacks[topic] = []
95+
self.client.subscribe(topic)
96+
self.topic_callbacks[topic].append(callback)
97+
98+
def get_latest_payload(self, topic):
99+
"""Retrieve the latest payload for a given topic."""
100+
with self.lock:
101+
return self.received_data.get(topic, None)
102+
103+
104+
# Monitor for multiple brokers
105+
@register
106+
class MonitorMQTT(Monitor):
107+
"""Monitor for MQTT topics using shared or unique broker connections."""
108+
109+
monitor_type = "mqtt_client"
110+
111+
def __init__(self, name, config_options):
112+
super().__init__(name, config_options)
113+
114+
# Monitor configuration
115+
self.topic = self.get_config_option("topic", required=True)
116+
self.success_state = self.get_config_option("success_state", required=True)
117+
self.last_payload = None
118+
self.status = "UNKNOWN"
119+
120+
# Broker configuration
121+
broker = self.get_config_option("broker", required=True)
122+
port = self.get_config_option("port", required_type="int", default=1883)
123+
username = self.get_config_option("username", required=False)
124+
password = self.get_config_option("password", required=False)
125+
tls_enabled = self.get_config_option("tls", required_type="bool", default=False)
126+
ca_cert = self.get_config_option("ca_cert", required=False)
127+
128+
# Get or create the MQTT broker manager
129+
self.mqtt_manager = MQTTBrokerManager(
130+
broker, port, username, password, tls_enabled, ca_cert
131+
)
132+
133+
# Subscribe to the topic with a callback
134+
self.mqtt_manager.subscribe(self.topic, self.on_message_received)
135+
136+
def on_message_received(self, payload):
137+
print(f"[{self.name}] Received message: {payload}")
138+
self.last_payload = payload
139+
self.evaluate_payload(payload)
140+
141+
def evaluate_payload(self, payload):
142+
"""Evaluate the payload against the success_state."""
143+
try:
144+
numeric_payload = float(payload)
145+
condition = self.success_state.strip()
146+
if condition.startswith("<"):
147+
threshold = float(condition[1:])
148+
self.status = "OK" if numeric_payload < threshold else "FAILED"
149+
elif condition.startswith(">"):
150+
threshold = float(condition[1:])
151+
self.status = "OK" if numeric_payload > threshold else "FAILED"
152+
elif "<" in condition and "x" in condition:
153+
parts = condition.split("<")
154+
lower = float(parts[0].strip())
155+
upper = float(parts[2].strip())
156+
self.status = "OK" if lower < numeric_payload < upper else "FAILED"
157+
else:
158+
self.status = "OK" if numeric_payload == float(condition) else "FAILED"
159+
except ValueError:
160+
self.status = "OK" if payload == self.success_state else "FAILED"
161+
162+
def run_test(self):
163+
if self.status == "OK":
164+
self.record_success(
165+
f"Payload '{self.last_payload}' matched condition '{self.success_state}'."
166+
)
167+
elif self.status == "UNKNOWN":
168+
self.record_skip(f"Topic '{self.topic}' did not received any messages yet.")
169+
else:
170+
self.record_fail(
171+
f"Payload '{self.last_payload}' did not match condition '{self.success_state}'."
172+
)

0 commit comments

Comments
 (0)