Lightweight helper utilities for working with MQTT via Paho, with convenient wrappers for:
-
Connecting and subscribing to topics, including retained-message handling
-
Publishing one or many messages with consistent metadata and timestamps
-
Reading “last/most recent” messages with timeout-based collection and optional type conversion
-
Developer helpers for JSON pretty-printing, deep updates, and logging configuration
-
Repository: https://github.com/vroomfondel/mqttstuff
-
Package:
mqttstuff
MQTTStuff provides a higher-level interface over paho-mqtt to simplify common patterns:
- A
MosquittoClientWrapperto configure, connect, subscribe, and publish with minimal boilerplate - A
MQTTLastDataReaderutility to retrieve the most recent messages for one or more topics quickly - A
MWMqttMessagenormalization data format for sending/receiving data (optional metadata) from/to IOT devices/sensors
Options:
-
From source (editable):
python -m venv .venv && source .venv/bin/activatepip install -r requirements-dev.txtpip install -e .
-
Build distributions with Hatch:
make pypibuild- Artifacts are created under
dist/
Simple publish and subscribe using the wrapper:
from mqttstuff import MosquittoClientWrapper
client = MosquittoClientWrapper(
host="localhost", port=1883, username="user", password="pass",
topics=["test/topic"],
)
def on_any_message(msg, userdata):
# msg is an instance of MWMqttMessage with convenient fields
print(msg.topic, msg.value)
client.set_on_msg_callback(on_any_message, rettype="valuemsg")
client.connect_and_start_loop_forever()
# elsewhere or in another process
client.publish_one("test/topic", {"hello": "world"}, retain=False)Read last retained or recent messages with a timeout:
from mqttstuff import MQTTLastDataReader
data = MQTTLastDataReader.get_most_recent_data_with_timeout(
host="localhost", port=1883, username="user", password="pass",
topics=["tele/+/STATE", "stat/+/STATUS"],
retained="only", # "yes" | "no" | "only"
rettype="str_raw", # or "json", "valuemsg", "str", "int", "float"
)
print(data)This repository does not provide a central config.py anymore. Pass your MQTT connection settings directly to the wrapper (see examples above) or manage configuration in your own application code.
Each Python module provided by this repository is documented here with a focused explanation of its purpose and usage.
Key classes and responsibilities:
-
MWMqttMessage(Pydantic model)- Normalized container for incoming/outgoing MQTT messages
- Helpers like
from_pahomsg(...)and fields fortopic,qos,retain,payload,value,created_at, and optionalmetadata
-
MosquittoClientWrapper- Thin wrapper around
paho.mqtt.client.Client - Simplifies connection setup and topic subscriptions via
set_topics([...]) - Register callbacks per-topic (
add_message_callback(topic, callback, rettype=...)) or a global callback (set_on_msg_callback) - Publish utilities:
publish_one(topic, value, created_at=None, metadata=None, rettype="valuemsg", retain=False, timeout=None)publish_multiple(list[MWMqttMessage], timeout=None)
- Connection loop helpers:
connect_and_start_loop_forever(topics=None, timeout_connect_seconds=None)wait_for_connect_and_start_loop()
- Convenience: automatic payload conversion for int/float/str/JSON/valuemsg
- Thin wrapper around
-
MQTTLastDataReader- Static helper to retrieve the most recent messages within a configurable timeout window
- Supports retained-only, no-retained, or mixed operation via
retainedparameter - Returns results in different representations via
rettypeandfallback_rettype
Example – per-topic callback with type conversion:
from mqttstuff import MosquittoClientWrapper
client = MosquittoClientWrapper(
host="localhost", port=1883, username="user", password="pass",
topics=["home/+/temperature"],
)
def on_temperature(msg, userdata):
# msg.value is already a number if rettype="int"/"float"
print("Temp:", msg.value)
client.add_message_callback("home/+/temperature", on_temperature, rettype="float")
client.connect_and_start_loop_forever()Small utilities used across the project:
ComplexEncoderfor JSON serialization of complex types (UUID, datetimes, dict/list pretty rendering)print_pretty_dict_json,get_pretty_dict_json,get_pretty_dict_json_no_sortupdate_deep(base, u)for deep dict/list merge/updateget_exception_tb_as_string(exc)for converting exception tracebacks to stringsget_loguru_logger_info()to introspect Loguru handlers and filters
Docker image building and publishing have been removed from this repository. If you need containerization, consider creating a separate Docker setup in your own project using this package from PyPI.
Helpful Makefile targets:
make help– list available targets with short descriptionsmake install– create virtualenv and install development requirementsmake venv– ensure.venvexists and dev requirements are installedmake tests– run pytestmake lint– run Black code formattermake isort– fix and check import ordermake tcheck– run mypy type checks over*.py,scripts/, andmqttstuff/make commit-checks– run pre-commit hooks on all filesmake prepare– run tests and commit-checks (useful before committing/PRs)make pypibuild– build sdist and wheel with Hatch intodist/make pypipush– publish built artifacts with Hatch (configure credentials first)
Tests live under tests/. Run all tests with:
pytest -qThis project is licensed under the LGPL where applicable/possible — see LICENSE.md. Some files/parts may be governed by other licenses and/or licensors, such as MIT | GPL | LGPL. Please also check file headers/comments.
See inline comments in the codebase for inspirations and references.
This is a development/experimental project. For production use, review security settings, customize configurations, and test thoroughly in your environment. Provided "as is" without warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose and noninfringement. In no event shall the authors or copyright holders be liable for any claim, damages or other liability, whether in an action of contract, tort or otherwise, arising from, out of or in connection with the software or the use or other dealings in the software. Use at your own risk.
