|
| 1 | +# MQTT Data Source Connectors for Pyspark |
| 2 | +[](https://docs.databricks.com/en/data-governance/unity-catalog/index.html) |
| 3 | +[](https://docs.databricks.com/en/compute/serverless.html) |
| 4 | +# Databricks Python Data Sources |
| 5 | + |
| 6 | +Introduced in Spark 4.x, Python Data Source API allows you to create PySpark Data Sources leveraging long standing python libraries for handling unique file types or specialized interfaces with spark read, readStream, write and writeStream APIs. |
| 7 | + |
| 8 | +| Data Source Name | Purpose | |
| 9 | +| --- | --- | |
| 10 | +| [MQTT](https://pypi.org/project/paho-mqtt/) | Read MQTT messages from a broker | |
| 11 | + |
| 12 | +--- |
| 13 | + |
| 14 | +## Configuration Options |
| 15 | + |
| 16 | +The MQTT data source supports the following configuration options, which can be set via Spark options or environment variables: |
| 17 | + |
| 18 | +| Option | Description | Required | Default | |
| 19 | +|--------|-------------|----------|---------| |
| 20 | +| `broker_address` | Hostname or IP address of the MQTT broker | Yes | - | |
| 21 | +| `port` | Port number of the MQTT broker | No | 8883 | |
| 22 | +| `username` | Username for broker authentication | No | "" | |
| 23 | +| `password` | Password for broker authentication | No | "" | |
| 24 | +| `topic` | MQTT topic to subscribe/publish to | No | "#" | |
| 25 | +| `qos` | Quality of Service level (0, 1, or 2) | No | 0 | |
| 26 | +| `require_tls` | Enable SSL/TLS (true/false) | No | true | |
| 27 | +| `keepalive` | Keep alive interval in seconds | No | 60 | |
| 28 | +| `clean_session` | Clean session flag (true/false) | No | false | |
| 29 | +| `conn_time` | Connection timeout in seconds | No | 1 | |
| 30 | +| `ca_certs` | Path to CA certificate file | No | - | |
| 31 | +| `certfile` | Path to client certificate file | No | - | |
| 32 | +| `keyfile` | Path to client key file | No | - | |
| 33 | +| `tls_disable_certs` | Disable certificate verification | No | - | |
| 34 | + |
| 35 | +You can set these options in your PySpark code, for example: |
| 36 | +```python |
| 37 | +display( |
| 38 | + spark.readStream.format("mqtt_pub_sub") |
| 39 | + .option("topic", "#") |
| 40 | + .option("broker_address", "host") |
| 41 | + .option("username", "secret_user") |
| 42 | + .option("password", "secret_password") |
| 43 | + .option("qos", 2) |
| 44 | + .option("require_tls", False) |
| 45 | + .load() |
| 46 | +) |
| 47 | +``` |
| 48 | + |
| 49 | +--- |
| 50 | + |
| 51 | +## Building and Running Tests |
| 52 | + |
| 53 | +* Clone repo |
| 54 | +* Create Virtual environment (Python 3.11) |
| 55 | +* Ensure Docker/Podman is installed and properly configured |
| 56 | +* Spin up a Docker container for a local MQTT Server: |
| 57 | +```yaml |
| 58 | +version: "3.7" |
| 59 | +services: |
| 60 | + mqtt5: |
| 61 | + userns_mode: keep-id |
| 62 | + image: eclipse-mosquitto |
| 63 | + container_name: mqtt5 |
| 64 | + ports: |
| 65 | + - "1883:1883" # default mqtt port |
| 66 | + - "9001:9001" # default mqtt port for websockets |
| 67 | + volumes: |
| 68 | + - ./config:/mosquitto/config:rw |
| 69 | + - ./data:/mosquitto/data:rw |
| 70 | + - ./log:/mosquitto/log:rw |
| 71 | + restart: unless-stopped |
| 72 | +``` |
| 73 | +
|
| 74 | +* Create .env file at the project root directory: |
| 75 | +```dotenv |
| 76 | +MQTT_BROKER_HOST= |
| 77 | +MQTT_BROKER_PORT= |
| 78 | +MQTT_USERNAME= |
| 79 | +MQTT_PASSWORD= |
| 80 | +MQTT_BROKER_TOPIC_PREFIX= |
| 81 | +``` |
| 82 | + |
| 83 | +* Run tests from project root directory |
| 84 | +```shell |
| 85 | +make test |
| 86 | +``` |
| 87 | + |
| 88 | +* Build package |
| 89 | +```shell |
| 90 | +python -m build |
| 91 | +``` |
| 92 | + |
| 93 | +--- |
| 94 | + |
| 95 | +## Example Usage |
| 96 | + |
| 97 | +```python |
| 98 | +spark.dataSource.register(MqttDataSource) |
| 99 | + |
| 100 | +display( |
| 101 | + spark.readStream.format("mqtt_pub_sub") |
| 102 | + .option("topic", "#") |
| 103 | + .option("broker_address", "host") |
| 104 | + .option("username", "secret_user") |
| 105 | + .option("password", "secret_password") |
| 106 | + .option("qos", 2) |
| 107 | + .option("require_tls", False) |
| 108 | + .load() |
| 109 | +) |
| 110 | + |
| 111 | +df.writeStream.format("console").start().awaitTermination() |
| 112 | +``` |
| 113 | + |
| 114 | +--- |
| 115 | + |
| 116 | +## Project Support |
| 117 | + |
| 118 | +The code in this project is provided **for exploration purposes only** and is **not formally supported** by Databricks under any Service Level Agreements (SLAs). It is provided **AS-IS**, without any warranties or guarantees. |
| 119 | + |
| 120 | +Please **do not submit support tickets** to Databricks for issues related to the use of this project. |
| 121 | + |
| 122 | +The source code provided is subject to the Databricks [LICENSE](https://github.com/databricks-industry-solutions/python-data-sources/blob/main/LICENSE.md) . All third-party libraries included or referenced are subject to their respective licenses set forth in the project license. |
| 123 | + |
| 124 | +Any issues or bugs found should be submitted as **GitHub Issues** on the project repository. While these will be reviewed as time permits, there are **no formal SLAs** for support. |
| 125 | + |
| 126 | +## 📄 Third-Party Package Licenses |
| 127 | + |
| 128 | +© 2025 Databricks, Inc. All rights reserved. The source in this project is provided subject to the Databricks License [https://databricks.com/db-license-source]. All included or referenced third party libraries are subject to the licenses set forth below. |
| 129 | + |
| 130 | +| Datasource | Package | Purpose | License | Source | |
| 131 | +| ---------- | ---------- | --------------------------------- | ----------- | ------------------------------------ | |
| 132 | +| paho-mqtt | paho-mqtt | Python api for mqtt | EPL-v20 & EDL-v10 | https://pypi.org/project/paho-mqtt/ | |
| 133 | + |
| 134 | +## References |
| 135 | + |
| 136 | +- [Paho MQTT Python Client](https://pypi.org/project/paho-mqtt/) |
| 137 | +- [Eclipse Mosquitto](https://mosquitto.org/) |
| 138 | +- [Databricks Python Data Source API](https://docs.databricks.com/en/data-engineering/data-sources/python-data-sources.html) |
0 commit comments