Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions mqtt/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# Databricks-specific Zone
.DS_Store
.python-version

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock

# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Ruff stuff:
.ruff_cache/

# PyPI configuration file
.pypirc
24 changes: 24 additions & 0 deletions mqtt/LICENSE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# DB license
**Definitions.**

Agreement: The agreement between Databricks, Inc., and you governing the use of the Databricks Services, as that term is defined in the Master Cloud Services Agreement (MCSA) located at www.databricks.com/legal/mcsa.

Licensed Materials: The source code, object code, data, and/or other works to which this license applies.

**Scope of Use.** You may not use the Licensed Materials except in connection with your use of the Databricks Services pursuant to the Agreement. Your use of the Licensed Materials must comply at all times with any restrictions applicable to the Databricks Services, generally, and must be used in accordance with any applicable documentation. You may view, use, copy, modify, publish, and/or distribute the Licensed Materials solely for the purposes of using the Licensed Materials within or connecting to the Databricks Services. If you do not agree to these terms, you may not view, use, copy, modify, publish, and/or distribute the Licensed Materials.

**Redistribution.** You may redistribute and sublicense the Licensed Materials so long as all use is in compliance with these terms. In addition:

- You must give any other recipients a copy of this License;
- You must cause any modified files to carry prominent notices stating that you changed the files;
- You must retain, in any derivative works that you distribute, all copyright, patent, trademark, and attribution notices, excluding those notices that do not pertain to any part of the derivative works; and
- If a "NOTICE" text file is provided as part of its distribution, then any derivative works that you distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the derivative works.


You may add your own copyright statement to your modifications and may provide additional license terms and conditions for use, reproduction, or distribution of your modifications, or for any such derivative works as a whole, provided your use, reproduction, and distribution of the Licensed Materials otherwise complies with the conditions stated in this License.

**Termination.** This license terminates automatically upon your breach of these terms or upon the termination of your Agreement. Additionally, Databricks may terminate this license at any time on notice. Upon termination, you must permanently delete the Licensed Materials and all copies thereof.

**DISCLAIMER; LIMITATION OF LIABILITY.**

THE LICENSED MATERIALS ARE PROVIDED “AS-IS” AND WITH ALL FAULTS. DATABRICKS, ON BEHALF OF ITSELF AND ITS LICENSORS, SPECIFICALLY DISCLAIMS ALL WARRANTIES RELATING TO THE LICENSED MATERIALS, EXPRESS AND IMPLIED, INCLUDING, WITHOUT LIMITATION, IMPLIED WARRANTIES, CONDITIONS AND OTHER TERMS OF MERCHANTABILITY, SATISFACTORY QUALITY OR FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. DATABRICKS AND ITS LICENSORS TOTAL AGGREGATE LIABILITY RELATING TO OR ARISING OUT OF YOUR USE OF OR DATABRICKS’ PROVISIONING OF THE LICENSED MATERIALS SHALL BE LIMITED TO ONE THOUSAND ($1,000) DOLLARS. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS OR THE USE OR OTHER DEALINGS IN THE LICENSED MATERIALS.
28 changes: 28 additions & 0 deletions mqtt/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
.PHONY: dev test unit style check

all: clean style test

clean: ## Remove build artifacts and cache files
rm -rf build/
rm -rf dist/
rm -rf *.egg-info/
rm -rf htmlcov/
rm -rf .coverage
rm -rf coverage.xml
rm -rf .pytest_cache/
rm -rf .mypy_cache/
rm -rf .ruff_cache/
find . -type d -name __pycache__ -delete
find . -type f -name "*.pyc" -delete

test:
pip install -r requirements.txt
pytest .

dev:
pip install -r requirements.txt

style:
pre-commit run --all-files

check: style test
138 changes: 138 additions & 0 deletions mqtt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# MQTT Data Source Connectors for Pyspark
[![Unity Catalog](https://img.shields.io/badge/Unity_Catalog-Enabled-00A1C9?style=for-the-badge)](https://docs.databricks.com/en/data-governance/unity-catalog/index.html)
[![Serverless](https://img.shields.io/badge/Serverless-Compute-00C851?style=for-the-badge)](https://docs.databricks.com/en/compute/serverless.html)
# Databricks Python Data Sources

Introduced in Spark 4.x, Python Data Source API allows you to create PySpark Data Sources leveraging long standing python libraries for handling unique file types or specialized interfaces with spark read, readStream, write and writeStream APIs.

| Data Source Name | Purpose |
| --- | --- |
| [MQTT](https://pypi.org/project/paho-mqtt/) | Read MQTT messages from a broker |

---

## Configuration Options

The MQTT data source supports the following configuration options, which can be set via Spark options or environment variables:

| Option | Description | Required | Default |
|--------|-------------|----------|---------|
| `broker_address` | Hostname or IP address of the MQTT broker | Yes | - |
| `port` | Port number of the MQTT broker | No | 8883 |
| `username` | Username for broker authentication | No | "" |
| `password` | Password for broker authentication | No | "" |
| `topic` | MQTT topic to subscribe/publish to | No | "#" |
| `qos` | Quality of Service level (0, 1, or 2) | No | 0 |
| `require_tls` | Enable SSL/TLS (true/false) | No | true |
| `keepalive` | Keep alive interval in seconds | No | 60 |
| `clean_session` | Clean session flag (true/false) | No | false |
| `conn_time` | Connection timeout in seconds | No | 1 |
| `ca_certs` | Path to CA certificate file | No | - |
| `certfile` | Path to client certificate file | No | - |
| `keyfile` | Path to client key file | No | - |
| `tls_disable_certs` | Disable certificate verification | No | - |

You can set these options in your PySpark code, for example:
```python
display(
spark.readStream.format("mqtt_pub_sub")
.option("topic", "#")
.option("broker_address", "host")
.option("username", "secret_user")
.option("password", "secret_password")
.option("qos", 2)
.option("require_tls", False)
.load()
)
```

---

## Building and Running Tests

* Clone repo
* Create Virtual environment (Python 3.11)
* Ensure Docker/Podman is installed and properly configured
* Spin up a Docker container for a local MQTT Server:
```yaml
version: "3.7"
services:
mqtt5:
userns_mode: keep-id
image: eclipse-mosquitto
container_name: mqtt5
ports:
- "1883:1883" # default mqtt port
- "9001:9001" # default mqtt port for websockets
volumes:
- ./config:/mosquitto/config:rw
- ./data:/mosquitto/data:rw
- ./log:/mosquitto/log:rw
restart: unless-stopped
```

* Create .env file at the project root directory:
```dotenv
MQTT_BROKER_HOST=
MQTT_BROKER_PORT=
MQTT_USERNAME=
MQTT_PASSWORD=
MQTT_BROKER_TOPIC_PREFIX=
```

* Run tests from project root directory
```shell
make test
```

* Build package
```shell
python -m build
```

---

## Example Usage

```python
spark.dataSource.register(MqttDataSource)

display(
spark.readStream.format("mqtt_pub_sub")
.option("topic", "#")
.option("broker_address", "host")
.option("username", "secret_user")
.option("password", "secret_password")
.option("qos", 2)
.option("require_tls", False)
.load()
)

df.writeStream.format("console").start().awaitTermination()
```

---

## Project Support

The code in this project is provided **for exploration purposes only** and is **not formally supported** by Databricks under any Service Level Agreements (SLAs). It is provided **AS-IS**, without any warranties or guarantees.

Please **do not submit support tickets** to Databricks for issues related to the use of this project.

The source code provided is subject to the Databricks [LICENSE](https://github.com/databricks-industry-solutions/python-data-sources/blob/main/LICENSE.md) . All third-party libraries included or referenced are subject to their respective licenses set forth in the project license.

Any issues or bugs found should be submitted as **GitHub Issues** on the project repository. While these will be reviewed as time permits, there are **no formal SLAs** for support.

## 📄 Third-Party Package Licenses

© 2025 Databricks, Inc. All rights reserved. The source in this project is provided subject to the Databricks License [https://databricks.com/db-license-source]. All included or referenced third party libraries are subject to the licenses set forth below.

| Datasource | Package | Purpose | License | Source |
| ---------- | ---------- | --------------------------------- | ----------- | ------------------------------------ |
| paho-mqtt | paho-mqtt | Python api for mqtt | EPL-v20 & EDL-v10 | https://pypi.org/project/paho-mqtt/ |

## References

- [Paho MQTT Python Client](https://pypi.org/project/paho-mqtt/)
- [Eclipse Mosquitto](https://mosquitto.org/)
- [Databricks Python Data Source API](https://docs.databricks.com/en/data-engineering/data-sources/python-data-sources.html)
Loading