diff --git a/mqtt/apps/.gitkeep b/mqtt/apps/.gitkeep
new file mode 100644
index 0000000..d409c8b
--- /dev/null
+++ b/mqtt/apps/.gitkeep
@@ -0,0 +1,5 @@
+# Add your Databricks Apps here
+# Each app should be in its own subdirectory with:
+# - app.py (main application file)
+# - app.yaml (configuration)
+# - requirements.txt (dependencies)
\ No newline at end of file
diff --git a/mqtt/apps/MQTT Data Monitor/app.py b/mqtt/apps/MQTT Data Monitor/app.py
new file mode 100644
index 0000000..dcc6427
--- /dev/null
+++ b/mqtt/apps/MQTT Data Monitor/app.py
@@ -0,0 +1,374 @@
+from flask import Flask, request, jsonify, render_template
+import logging
+from datetime import datetime, timedelta
+import pandas
+from databricks import sql
+from databricks.sdk.core import Config
+from databricks.sdk import WorkspaceClient
+from databricks.sdk.service import jobs
+import os
+import json
+import time
+import re
+import paho.mqtt.client as mqtt_client
+
+# Configure logging
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+)
+
+# Create logger for this application
+logger = logging.getLogger(__name__)
+
+# Suppress werkzeug logging to reduce noise
+werkzeug_log = logging.getLogger('werkzeug')
+werkzeug_log.setLevel(logging.ERROR)
+
+flask_app = Flask(__name__)
+
+
+client = WorkspaceClient()
+
+# Database connection function
+def get_data(query, warehouse_id, params=None):
+ """Execute query with fallback to demo data"""
+ try:
+ cfg = Config()
+ if warehouse_id and cfg.host:
+ with sql.connect(
+ server_hostname=cfg.host,
+ http_path=f"/sql/1.0/warehouses/{warehouse_id}",
+ credentials_provider=lambda: cfg.authenticate
+ ) as connection:
+ if params:
+ df = pandas.read_sql(query, connection, params=params)
+ else:
+ df = pandas.read_sql(query, connection)
+
+ return df.to_dict('records')
+ except Exception as e:
+ logger.error(f"Database query failed: {e}")
+ # Return empty list on error so we can show a message
+ return []
+
+
+def create_job(client, notebook_path, cluster_id):
+
+ created_job = client.jobs.create(
+ name=f"mqtt_{time.time_ns()}",
+ tasks=[
+ jobs.Task(
+ description="mqtt",
+ notebook_task=jobs.NotebookTask(notebook_path=notebook_path, source=jobs.Source("WORKSPACE")),
+ task_key="mqtt",
+ timeout_seconds=0,
+ existing_cluster_id=cluster_id,
+ )
+ ],
+ )
+ return created_job
+
+def run_job(client, created_job, mqtt_config):
+ """Run the job with user-provided MQTT configuration"""
+ run = client.jobs.run_now(
+ job_id=created_job.job_id,
+ notebook_params={
+ "catalog": mqtt_config.get('catalog', 'dbdemos'),
+ "database": mqtt_config.get('schema', 'dbdemos_mqtt'),
+ "table": mqtt_config.get('table', 'mqtt_v5'),
+ "broker": mqtt_config.get('broker_address', ''),
+ "port": mqtt_config.get('port', '8883'),
+ "username": mqtt_config.get('username', ''),
+ "password": mqtt_config.get('password', ''),
+ "topic": mqtt_config.get('topic', '#'),
+ "qos": mqtt_config.get('qos', '0'),
+ "require_tls": mqtt_config.get('require_tls', 'false'),
+ "keepalive": mqtt_config.get('keepalive', '60'),
+ }
+ )
+ return run
+
+
+def mqtt_remote_client(mqtt_server_config):
+ """Test MQTT connection with provided configuration"""
+ connection_result = {
+ 'success': False,
+ 'message': '',
+ 'error': None
+ }
+ try:
+ # Callback function for when the client connects
+ def on_connect(client, userdata, flags, rc):
+ if rc == 0:
+ logger.info("Connected successfully to MQTT Broker!")
+ else:
+ logger.error(f"Failed to connect, return code {rc}")
+
+ client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1, client_id="mqtt_connection_test", clean_session=True)
+
+ # Set username and password if provided
+ if mqtt_server_config.get("username") and mqtt_server_config.get("password"):
+ client.username_pw_set(
+ username=mqtt_server_config["username"],
+ password=mqtt_server_config["password"]
+ )
+
+ # Set TLS if required
+ if mqtt_server_config.get("require_tls") == 'true':
+ import ssl
+ tls_config = {}
+ if mqtt_server_config.get("ca_certs"):
+ tls_config['ca_certs'] = mqtt_server_config["ca_certs"]
+ if mqtt_server_config.get("certfile"):
+ tls_config['certfile'] = mqtt_server_config["certfile"]
+ if mqtt_server_config.get("keyfile"):
+ tls_config['keyfile'] = mqtt_server_config["keyfile"]
+
+ # If disable certs verification is checked
+ if mqtt_server_config.get("tls_disable_certs") == 'true':
+ tls_config['cert_reqs'] = ssl.CERT_NONE
+
+ client.tls_set(**tls_config)
+
+ client.on_connect = on_connect
+
+ # Attempt connection
+ port = int(mqtt_server_config.get("port", 8883))
+ keepalive = int(mqtt_server_config.get("keepalive", 60))
+
+ client.connect(mqtt_server_config["host"], port, keepalive)
+
+ # Start the loop to process network traffic
+ client.loop_start()
+
+ # Give it a moment to connect
+ time.sleep(2)
+
+ # Check if connected
+ if client.is_connected():
+ connection_result['success'] = True
+ connection_result['message'] = f'Successfully connected to MQTT broker at {mqtt_server_config["host"]}:{port}'
+ else:
+ connection_result['message'] = 'Failed to connect to MQTT broker'
+ connection_result['error'] = f'Failed to connect to {mqtt_server_config["host"]} and {port} and {mqtt_server_config["username"]} are not working'
+
+ # Disconnect
+ client.loop_stop()
+ client.disconnect()
+
+ except Exception as e:
+ connection_result['success'] = False
+ connection_result['message'] = 'Connection failed'
+ connection_result['error'] = str(e)
+
+ return connection_result
+
+
+# Initialize with empty data - will be loaded when user specifies catalog/schema/table and clicks refresh
+curr_data = []
+
+
+def get_mqtt_stats():
+ """Get MQTT message statistics from data"""
+ if not curr_data or len(curr_data) == 0:
+ return {
+ 'duplicated_messages': 0,
+ 'qos2_messages': 0,
+ 'unique_topics': 0,
+ 'total_messages': 0
+ }
+
+ # Count duplicated messages
+ duplicated = len([a for a in curr_data if str(a.get('is_duplicate', 'false')).lower() == 'true'])
+ # Count QoS 2 messages
+ qos2_messages = len([a for a in curr_data if str(a.get('qos', 0)) == '2'])
+ # Count unique topics
+ unique_topics = len(set([str(a.get('topic', '')) for a in curr_data]))
+ # Total row count
+ row_count = len(curr_data)
+
+ return {
+ 'duplicated_messages': duplicated,
+ 'qos2_messages': qos2_messages,
+ 'unique_topics': unique_topics,
+ 'total_messages': row_count
+ }
+
+
+@flask_app.route('/api/test-mqtt-connection', methods=['POST'])
+def test_mqtt_connection():
+ """API endpoint to test MQTT broker connection"""
+ try:
+ # Get the configuration from the request
+ mqtt_config = request.json
+
+ # Validate required fields
+ if not mqtt_config.get('broker_address'):
+ return jsonify({
+ 'success': False,
+ 'error': 'Broker address is required'
+ }), 400
+
+ # Prepare config for MQTT test
+ mqtt_server_config = {
+ 'host': mqtt_config.get('broker_address'),
+ 'port': mqtt_config.get('port', '1883'),
+ 'username': mqtt_config.get('username', ''),
+ 'password': mqtt_config.get('password', ''),
+ 'require_tls': mqtt_config.get('require_tls', 'false'),
+ 'ca_certs': mqtt_config.get('ca_certs', ''),
+ 'certfile': mqtt_config.get('certfile', ''),
+ 'keyfile': mqtt_config.get('keyfile', ''),
+ 'tls_disable_certs': mqtt_config.get('tls_disable_certs', 'false'),
+ 'keepalive': mqtt_config.get('keepalive', '60')
+ }
+ # Test the connection
+ result = mqtt_remote_client(mqtt_server_config)
+
+ if result['success']:
+ return jsonify(result), 200
+ else:
+ return jsonify(result), 400
+
+ except Exception as e:
+ return jsonify({
+ 'success': False,
+ 'message': 'Connection test failed',
+ 'error': str(e)
+ }), 500
+
+
+@flask_app.route('/api/start-mqtt-job', methods=['POST'])
+def start_mqtt_job():
+ """API endpoint to start the MQTT data ingestion job with user configuration"""
+ try:
+ # Get the configuration from the request
+ mqtt_config = request.json
+
+ # Validate required fields
+ if not mqtt_config.get('broker_address'):
+ return jsonify({
+ 'success': False,
+ 'error': 'Broker address is required'
+ }), 400
+
+ if not mqtt_config.get('catalog') or not mqtt_config.get('schema') or not mqtt_config.get('table'):
+ return jsonify({
+ 'success': False,
+ 'error': 'Catalog, Schema, and Table name are required'
+ }), 400
+
+ # Get notebook_path and cluster_id from config
+ notebook_path = mqtt_config.get('notebook_path')
+ cluster_id = mqtt_config.get('cluster_id')
+
+ # Create the job
+ created_job = create_job(client, notebook_path, cluster_id)
+
+ # Run the job with user configuration
+ run = run_job(client, created_job, mqtt_config)
+
+ catalog = mqtt_config.get('catalog')
+ schema = mqtt_config.get('schema')
+ table = mqtt_config.get('table')
+
+ return jsonify({
+ 'success': True,
+ 'job_id': created_job.job_id,
+ 'run_id': run.run_id,
+ 'message': f'MQTT data ingestion job started successfully. Data will be written to {catalog}.{schema}.{table}'
+ })
+
+ except Exception as e:
+ return jsonify({
+ 'success': False,
+ 'error': str(e)
+ }), 500
+
+
+@flask_app.route('/api/refresh-data', methods=['POST'])
+def refresh_data():
+ """API endpoint to refresh dashboard data from specified table"""
+ global curr_data
+ try:
+ data = request.json
+ catalog = data.get('catalog')
+ schema = data.get('schema')
+ table = data.get('table')
+ warehouse_id = data.get('warehouse_id', '4b9b953939869799') # Default fallback
+
+ # Validate required fields
+ if not catalog or not schema or not table:
+ return jsonify({
+ 'success': False,
+ 'error': 'Catalog, Schema, and Table name are required'
+ }), 400
+
+ # Build the query with parameterized values
+ query = "SELECT message, is_duplicate, qos, topic, received_time FROM %s.%s.%s ORDER BY received_time DESC LIMIT %s"
+
+ # Fetch data using get_data function with parameters
+ curr_data = get_data(query, warehouse_id, (catalog, schema, table, 100))
+
+ # Calculate stats from refreshed data
+ stats = get_mqtt_stats()
+
+ return jsonify({
+ 'success': True,
+ 'message': f'Data refreshed from {catalog}.{schema}.{table}',
+ 'row_count': len(curr_data) if curr_data else 0,
+ 'data': curr_data, # Return the actual data to update the UI
+ 'stats': stats # Return the calculated stats
+ })
+ except Exception as e:
+ return jsonify({
+ 'success': False,
+ 'error': str(e)
+ }), 500
+
+
+@flask_app.route('/')
+def dashboard():
+ """Main MQTT Data Monitor dashboard page"""
+ stats = get_mqtt_stats()
+
+ return render_template('dashboard.html', stats=stats, curr_data=curr_data)
+
+def get_alert_icon(alert_type):
+ if 'IoT Sensor' in alert_type:
+ return 'fa-thermometer-half'
+ elif 'Device Status' in alert_type:
+ return 'fa-wifi'
+ elif 'System Metrics' in alert_type:
+ return 'fa-chart-bar'
+ elif 'Security' in alert_type:
+ return 'fa-shield-alt'
+ else:
+ return 'fa-broadcast-tower'
+
+def get_status_color(status):
+ if status == 'new':
+ return 'bg-blue-100 text-blue-800'
+ elif status == 'in_progress':
+ return 'bg-yellow-100 text-yellow-800'
+ elif status == 'resolved':
+ return 'bg-green-100 text-green-800'
+ else:
+ return 'bg-gray-100 text-gray-800'
+
+def get_severity_color(severity):
+ if severity == 'CRITICAL':
+ return 'bg-red-100 text-red-800'
+ elif severity == 'WARNING':
+ return 'bg-yellow-100 text-yellow-800'
+ elif severity == 'INFO':
+ return 'bg-blue-100 text-blue-800'
+ else:
+ return 'bg-gray-100 text-gray-800'
+
+if __name__ == '__main__':
+ logger.info("Starting MQTT Data Monitor Dashboard")
+ logger.info("MQTT Message Processing & Analytics Platform")
+ logger.info("=" * 50)
+ flask_app.run(debug=True, host='0.0.0.0', port=8001)
diff --git a/mqtt/apps/MQTT Data Monitor/app.yaml b/mqtt/apps/MQTT Data Monitor/app.yaml
new file mode 100644
index 0000000..2977f51
--- /dev/null
+++ b/mqtt/apps/MQTT Data Monitor/app.yaml
@@ -0,0 +1,6 @@
+command: [
+ "flask",
+ "--app",
+ "app.py",
+ "run"
+]
diff --git a/mqtt/apps/MQTT Data Monitor/requirements.txt b/mqtt/apps/MQTT Data Monitor/requirements.txt
new file mode 100644
index 0000000..8e9b046
--- /dev/null
+++ b/mqtt/apps/MQTT Data Monitor/requirements.txt
@@ -0,0 +1,4 @@
+databricks-sdk>=0.66.0
+flask>=2.3.0
+pandas>=1.5.0
+paho-mqtt>=2.0.0
\ No newline at end of file
diff --git a/mqtt/apps/MQTT Data Monitor/static/css/style.css b/mqtt/apps/MQTT Data Monitor/static/css/style.css
new file mode 100644
index 0000000..11b8cd7
--- /dev/null
+++ b/mqtt/apps/MQTT Data Monitor/static/css/style.css
@@ -0,0 +1,214 @@
+/* MQTT Theme Status Colors */
+.status-new {
+ background-color: #e3f2fd;
+ color: #0078D4;
+}
+
+.status-in_progress {
+ background-color: #fff3e0;
+ color: #ff8c00;
+}
+
+.status-resolved {
+ background-color: #e8f5e8;
+ color: #00a86b;
+}
+
+/* MQTT Theme Priority Colors */
+.severity-critical {
+ background-color: #ffebee;
+ color: #e74c3c;
+}
+
+.severity-warning {
+ background-color: #fff3e0;
+ color: #ff8c00;
+}
+
+.severity-info {
+ background-color: #e3f2fd;
+ color: #0078D4;
+}
+
+/* MQTT Hover Effects */
+.mqtt-hover:hover {
+ background-color: #f8f9fa;
+ transition: all 0.2s ease;
+}
+
+/* Tab Styles */
+.tab-button.active {
+ border-color: #0078D4 !important;
+ color: #0078D4 !important;
+}
+
+.tab-content.hidden {
+ display: none;
+}
+
+/* Custom Responsive Styles */
+@media (max-width: 768px) {
+ .grid-cols-1 {
+ grid-template-columns: repeat(1, minmax(0, 1fr));
+ }
+
+ .md\:grid-cols-2 {
+ grid-template-columns: repeat(1, minmax(0, 1fr));
+ }
+
+ .lg\:grid-cols-4 {
+ grid-template-columns: repeat(1, minmax(0, 1fr));
+ }
+
+ .space-x-8 > :not([hidden]) ~ :not([hidden]) {
+ margin-left: 1rem;
+ }
+
+ .px-4 {
+ padding-left: 0.5rem;
+ padding-right: 0.5rem;
+ }
+}
+
+/* Loading Animation */
+.loading {
+ position: relative;
+}
+
+.loading::after {
+ content: '';
+ position: absolute;
+ top: 50%;
+ left: 50%;
+ width: 20px;
+ height: 20px;
+ margin: -10px 0 0 -10px;
+ border: 2px solid #f3f3f3;
+ border-top: 2px solid #0078D4;
+ border-radius: 50%;
+ animation: spin 1s linear infinite;
+}
+
+@keyframes spin {
+ 0% { transform: rotate(0deg); }
+ 100% { transform: rotate(360deg); }
+}
+
+/* Custom Form Elements */
+.form-input:focus {
+ outline: none;
+ border-color: #0078D4;
+ box-shadow: 0 0 0 3px rgba(0, 120, 212, 0.1);
+}
+
+/* Notification Styles */
+.notification-enter {
+ animation: slideIn 0.3s ease-out;
+}
+
+.notification-leave {
+ animation: slideOut 0.3s ease-in;
+}
+
+@keyframes slideIn {
+ from {
+ transform: translateY(-100%);
+ opacity: 0;
+ }
+ to {
+ transform: translateY(0);
+ opacity: 1;
+ }
+}
+
+@keyframes slideOut {
+ from {
+ transform: translateY(0);
+ opacity: 1;
+ }
+ to {
+ transform: translateY(-100%);
+ opacity: 0;
+ }
+}
+
+/* Table Enhancements */
+.table-row-hover:hover {
+ background-color: rgba(0, 120, 212, 0.05);
+ transition: background-color 0.15s ease;
+}
+
+/* Button Enhancements */
+.btn-primary {
+ background-color: #0078D4;
+ transition: all 0.2s ease;
+}
+
+.btn-primary:hover {
+ background-color: #106ebe;
+ transform: translateY(-1px);
+ box-shadow: 0 4px 8px rgba(0, 120, 212, 0.3);
+}
+
+.btn-secondary {
+ background-color: #037da5;
+ transition: all 0.2s ease;
+}
+
+.btn-secondary:hover {
+ background-color: #026d91;
+ transform: translateY(-1px);
+ box-shadow: 0 4px 8px rgba(3, 125, 165, 0.3);
+}
+
+/* Card Enhancements */
+.card-shadow {
+ box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06);
+ transition: box-shadow 0.3s ease;
+}
+
+.card-shadow:hover {
+ box-shadow: 0 10px 15px -3px rgba(0, 0, 0, 0.1), 0 4px 6px -2px rgba(0, 0, 0, 0.05);
+}
+
+/* Search Input Enhancements */
+.search-input {
+ transition: all 0.2s ease;
+}
+
+.search-input:focus {
+ transform: scale(1.02);
+ box-shadow: 0 4px 12px rgba(0, 120, 212, 0.15);
+}
+
+/* Badge Styles */
+.badge {
+ display: inline-flex;
+ align-items: center;
+ padding: 0.25rem 0.75rem;
+ border-radius: 9999px;
+ font-size: 0.75rem;
+ font-weight: 500;
+ text-transform: uppercase;
+ letter-spacing: 0.025em;
+}
+
+.badge-success {
+ background-color: rgba(0, 168, 107, 0.1);
+ color: #00a86b;
+}
+
+.badge-error {
+ background-color: rgba(231, 76, 60, 0.1);
+ color: #e74c3c;
+}
+
+.badge-warning {
+ background-color: rgba(255, 140, 0, 0.1);
+ color: #ff8c00;
+}
+
+.badge-info {
+ background-color: rgba(0, 120, 212, 0.1);
+ color: #0078D4;
+}
diff --git a/mqtt/apps/MQTT Data Monitor/static/js/app.js b/mqtt/apps/MQTT Data Monitor/static/js/app.js
new file mode 100644
index 0000000..6660f3b
--- /dev/null
+++ b/mqtt/apps/MQTT Data Monitor/static/js/app.js
@@ -0,0 +1,713 @@
+console.log('MQTT Data Monitor loaded successfully!');
+console.log('Theme: HiveMQ & Azure Event Grid inspired');
+
+// Function to update stat counters
+function updateStatsCards(stats) {
+ console.log('Updating stats cards with:', stats);
+
+ // Update Duplicated Messages
+ const duplicatedElement = document.querySelector('.text-2xl.font-semibold.text-mqtt-error');
+ if (duplicatedElement) {
+ duplicatedElement.textContent = stats.duplicated_messages || 0;
+ }
+
+ // Update QOS=2 Messages
+ const qos2Element = document.querySelector('.text-2xl.font-semibold.text-mqtt-warning');
+ if (qos2Element) {
+ qos2Element.textContent = stats.qos2_messages || 0;
+ }
+
+ // Update Unique Topics
+ const topicsElement = document.querySelector('.text-2xl.font-semibold.text-mqtt-success');
+ if (topicsElement) {
+ topicsElement.textContent = stats.unique_topics || 0;
+ }
+
+ // Update Total Messages
+ const totalElement = document.querySelector('.text-2xl.font-semibold.text-mqtt-azure');
+ if (totalElement) {
+ totalElement.textContent = stats.total_messages || 0;
+ }
+}
+
+// Function to update table with new data
+function updateTableWithData(data) {
+ const tableBody = document.getElementById('messageTableBody');
+
+ // Clear existing rows (except the "no results" row)
+ const noResultsRow = document.getElementById('noResultsRow');
+ tableBody.innerHTML = '';
+ if (noResultsRow) {
+ tableBody.appendChild(noResultsRow);
+ }
+
+ // If no data, show empty state
+ if (!data || data.length === 0) {
+ tableBody.innerHTML = `
+
+
+
+
+ No Data Found
+ No messages found in the specified table.
+
+ |
+
+ `;
+ return;
+ }
+
+ // Build HTML for each row
+ data.forEach(row => {
+ const isDup = String(row.is_duplicate || 'false').toLowerCase();
+ const dupColor = isDup === 'true' ? 'bg-red-100 text-red-800' : 'bg-green-100 text-green-800';
+ const dupText = isDup === 'true' ? 'Yes' : 'No';
+
+ const qosValue = String(row.qos || 0);
+ const qosColor = qosValue === '2' ? 'bg-blue-100 text-blue-800' :
+ qosValue === '1' ? 'bg-yellow-100 text-yellow-800' :
+ 'bg-gray-100 text-gray-800';
+
+ const message = String(row.message || '');
+ const messageDisplay = message.length > 100 ? message.substring(0, 100) + '...' : message;
+
+ const topic = String(row.topic || 'N/A');
+ const receivedTime = String(row.received_time || 'N/A');
+
+ const rowHtml = `
+
+ |
+
+ |
+
+
+ ${dupText}
+
+ |
+
+
+ QoS ${qosValue}
+
+ |
+
+ ${topic}
+ |
+
+ ${receivedTime}
+ |
+
+ `;
+
+ tableBody.insertAdjacentHTML('beforeend', rowHtml);
+ });
+
+ // Re-apply search filter if active
+ const searchInput = document.getElementById('searchInput');
+ if (searchInput && searchInput.value) {
+ filterTable();
+ }
+}
+
+// Notification Functions
+function showNotification(type, title, message) {
+ const modal = document.getElementById('notificationModal');
+ const icon = document.getElementById('notificationIcon');
+ const titleEl = document.getElementById('notificationTitle');
+ const messageEl = document.getElementById('notificationMessage');
+ const button = document.getElementById('notificationButton');
+
+ // Set content
+ titleEl.textContent = title;
+ messageEl.textContent = message;
+
+ // Set icon and colors based on type
+ if (type === 'success') {
+ icon.innerHTML = '
';
+ titleEl.className = 'text-lg font-medium text-mqtt-success';
+ button.className = 'w-full px-4 py-2 rounded-lg text-white font-medium bg-mqtt-success hover:bg-green-700';
+ } else if (type === 'error') {
+ icon.innerHTML = '
';
+ titleEl.className = 'text-lg font-medium text-mqtt-error';
+ button.className = 'w-full px-4 py-2 rounded-lg text-white font-medium bg-mqtt-error hover:bg-red-700';
+ } else if (type === 'info') {
+ icon.innerHTML = '
';
+ titleEl.className = 'text-lg font-medium text-mqtt-azure';
+ button.className = 'w-full px-4 py-2 rounded-lg text-white font-medium bg-mqtt-azure hover:bg-blue-700';
+ }
+ // Show modal
+ modal.classList.remove('hidden');
+
+ // Auto-close after 5 seconds
+ setTimeout(() => {
+ closeNotification();
+ }, 5000);
+}
+
+function closeNotification() {
+ const modal = document.getElementById('notificationModal');
+ modal.classList.add('hidden');
+}
+
+// Filter table function for search
+function filterTable() {
+ const searchInput = document.getElementById('searchInput');
+ const searchTerm = searchInput.value.toLowerCase().trim();
+ const tableBody = document.getElementById('messageTableBody');
+ const rows = tableBody.getElementsByTagName('tr');
+ const rowsArray = Array.from(rows).filter(row => row.id !== 'noResultsRow');
+
+ rowsArray.forEach(function(row) {
+ if (searchTerm === '') {
+ // Show all rows when search is empty
+ row.style.display = '';
+ } else {
+ // Get searchable data from row attributes (only topic/source)
+ const topic = row.getAttribute('data-topic') || '';
+
+ // Search only in the topic/source field
+ const searchableText = topic;
+
+ // Show/hide row based on search match
+ if (searchableText.includes(searchTerm)) {
+ row.style.display = '';
+ } else {
+ row.style.display = 'none';
+ }
+ }
+ });
+
+ // Update search results count and show/hide no results message
+ const visibleRows = rowsArray.filter(row => row.style.display !== 'none').length;
+ const noResultsRow = document.getElementById('noResultsRow');
+
+ if (searchTerm !== '' && visibleRows === 0) {
+ noResultsRow.style.display = '';
+ } else {
+ noResultsRow.style.display = 'none';
+ }
+}
+
+// Tab switching functionality
+function switchTab(activeTab, activeContent, inactiveTab, inactiveContent) {
+ // Update tab appearance
+ activeTab.classList.add('active', 'border-mqtt-azure', 'text-mqtt-azure');
+ activeTab.classList.remove('border-transparent', 'text-mqtt-text-light');
+
+ inactiveTab.classList.remove('active', 'border-mqtt-azure', 'text-mqtt-azure');
+ inactiveTab.classList.add('border-transparent', 'text-mqtt-text-light');
+
+ // Show/hide content
+ activeContent.classList.remove('hidden');
+ inactiveContent.classList.add('hidden');
+}
+
+// DOMContentLoaded event handler
+document.addEventListener('DOMContentLoaded', function() {
+ const connectionTab = document.getElementById('connectionTab');
+ const dashboardTab = document.getElementById('dashboardTab');
+ const connectionContent = document.getElementById('connectionContent');
+ const dashboardContent = document.getElementById('dashboardContent');
+
+ // Connection tab click handler
+ connectionTab.addEventListener('click', function() {
+ switchTab(connectionTab, connectionContent, dashboardTab, dashboardContent);
+ });
+
+ // Dashboard tab click handler
+ dashboardTab.addEventListener('click', function() {
+ switchTab(dashboardTab, dashboardContent, connectionTab, connectionContent);
+ });
+
+ // MQTT Configuration Form Handlers
+ const mqttForm = document.getElementById('mqttConfigForm');
+ const testConnectionBtn = document.getElementById('testConnection');
+ const saveConfigBtn = document.getElementById('saveConfig');
+ const connectAndMonitorBtn = document.getElementById('connectAndMonitor');
+
+ console.log('โ
Form elements found:', 'mqttForm:', !!mqttForm, 'testConnectionBtn:', !!testConnectionBtn, 'saveConfigBtn:', !!saveConfigBtn, 'connectAndMonitorBtn:', !!connectAndMonitorBtn);
+
+ if (!testConnectionBtn) {
+ console.error('โ Test Connection button not found!');
+ return;
+ }
+ console.log('๐ง Attaching event listener to Test Connection button...');
+
+ // Test Connection Handler
+ testConnectionBtn.addEventListener('click', async function(e) {
+ console.log('๐ Test Connection button clicked!');
+ e.preventDefault();
+ e.stopPropagation();
+
+ const brokerAddress = document.getElementById('broker_address').value;
+ console.log('Broker address:', brokerAddress);
+
+ if (!brokerAddress) {
+ showNotification('error', 'Missing Broker Address', 'Please enter a broker address first.');
+ return;
+ }
+ // Show loading state
+ const originalHTML = this.innerHTML;
+ this.innerHTML = 'Testing Connection...';
+ this.disabled = true;
+
+ try {
+ // Get form data
+ const formData = new FormData(mqttForm);
+ const config = {};
+ // Convert FormData to object
+ for (const [key, value] of formData.entries()) {
+ config[key] = value;
+ }
+ // Handle checkboxes
+ config['require_tls'] = document.getElementById('require_tls').checked ? 'true' : 'false';
+ config['tls_disable_certs'] = document.getElementById('tls_disable_certs').checked ? 'true' : 'false';
+
+ console.log('๐ Testing MQTT connection with config:', config);
+ console.log('๐ค Sending request to /api/test-mqtt-connection');
+
+ // Send test request to backend API
+ const response = await fetch('/api/test-mqtt-connection', {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ },
+ body: JSON.stringify(config)
+ });
+
+ console.log('๐ฅ Response status:', response.status);
+ const result = await response.json();
+ console.log('๐ Response data:', result);
+
+ if (result.success) {
+ // Success
+ this.innerHTML = 'Connection Successful!';
+ this.classList.remove('bg-mqtt-secondary');
+ this.classList.add('bg-mqtt-success');
+
+ // Show success modal
+ showNotification('success', 'Connection Successful!', result.message);
+
+ // Reset button after delay
+ setTimeout(() => {
+ this.innerHTML = originalHTML;
+ this.classList.remove('bg-mqtt-success');
+ this.classList.add('bg-mqtt-secondary');
+ this.disabled = false;
+ }, 3000);
+ } else {
+ // Failure
+ throw new Error(result.error || result.message || 'Connection failed');
+ }
+ } catch (error) {
+ console.error('MQTT connection test failed:', error);
+
+ // Show error state
+ this.innerHTML = 'Connection Failed';
+ this.classList.remove('bg-mqtt-secondary');
+ this.classList.add('bg-mqtt-error');
+
+ // Show error modal
+ showNotification('error', 'Connection Failed', error.message);
+
+ // Reset button after delay
+ setTimeout(() => {
+ this.innerHTML = originalHTML;
+ this.classList.remove('bg-mqtt-error');
+ this.classList.add('bg-mqtt-secondary');
+ this.disabled = false;
+ }, 3000);
+ }
+ });
+
+ console.log('โ
Test Connection event listener attached successfully!');
+
+ // Save Configuration Handler
+ saveConfigBtn.addEventListener('click', function() {
+ const formData = new FormData(mqttForm);
+ const config = Object.fromEntries(formData);
+
+ // Save to localStorage (in real app, this would be saved to backend)
+ localStorage.setItem('mqttConfig', JSON.stringify(config));
+
+ this.innerHTML = 'Saved!';
+ this.classList.remove('bg-mqtt-warning');
+ this.classList.add('bg-mqtt-success');
+
+ setTimeout(() => {
+ this.innerHTML = 'Save Configuration';
+ this.classList.remove('bg-mqtt-success');
+ this.classList.add('bg-mqtt-warning');
+ }, 2000);
+ });
+
+ // Connect and Monitor Handler
+ connectAndMonitorBtn.addEventListener('click', async function(e) {
+ e.preventDefault();
+
+ const brokerAddress = document.getElementById('broker_address').value;
+ if (!brokerAddress) {
+ alert('Please enter a broker address first.');
+ return;
+ }
+ // Show loading state
+ const originalHTML = this.innerHTML;
+ this.innerHTML = 'Starting MQTT Job...';
+ this.disabled = true;
+
+ try {
+ // Get form data
+ const formData = new FormData(mqttForm);
+ const config = {};
+ // Convert FormData to object, handling checkboxes properly
+ for (const [key, value] of formData.entries()) {
+ config[key] = value;
+ }
+ // Handle checkboxes (they won't be in formData if unchecked)
+ config['require_tls'] = document.getElementById('require_tls').checked ? 'true' : 'false';
+ config['tls_disable_certs'] = document.getElementById('tls_disable_certs').checked ? 'true' : 'false';
+
+ // Save configuration locally
+ localStorage.setItem('mqttConfig', JSON.stringify(config));
+
+ console.log('๐ค Sending MQTT Configuration to server:', config);
+ console.log('๐ Data will be written to:', `${config.catalog}.${config.schema}.${config.table}`);
+
+ // Send configuration to backend API
+ const response = await fetch('/api/start-mqtt-job', {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ },
+ body: JSON.stringify(config)
+ });
+
+ const result = await response.json();
+
+ if (result.success) {
+ // Success - show confirmation and switch to dashboard
+ this.innerHTML = 'Job Started!';
+ this.classList.remove('bg-mqtt-azure');
+ this.classList.add('bg-mqtt-success');
+
+ showNotification('success', 'Job Started Successfully!', result.message || `Job ID: ${result.job_id}, Run ID: ${result.run_id}`);
+
+ // Switch to dashboard tab after a short delay
+ setTimeout(() => {
+ switchTab(dashboardTab, dashboardContent, connectionTab, connectionContent);
+ this.innerHTML = originalHTML;
+ this.classList.remove('bg-mqtt-success');
+ this.classList.add('bg-mqtt-azure');
+ this.disabled = false;
+ }, 2000);
+ } else {
+ // Error from server
+ throw new Error(result.error || 'Unknown error occurred');
+ }
+ } catch (error) {
+ console.error('Error starting MQTT job:', error);
+ alert(`Failed to start MQTT job: ${error.message}`);
+
+ // Reset button state
+ this.innerHTML = originalHTML;
+ this.disabled = false;
+ }
+ });
+
+ // Refresh Dashboard Data Handler
+ const refreshDataBtn = document.getElementById('refreshDataBtn');
+ console.log('Refresh button element:', refreshDataBtn);
+ if (refreshDataBtn) {
+ refreshDataBtn.addEventListener('click', async function() {
+ console.log('Refresh button clicked!');
+ const catalog = document.getElementById('catalog').value;
+ const schema = document.getElementById('schema').value;
+ const table = document.getElementById('table').value;
+
+ console.log('Form values:', { catalog, schema, table });
+
+ if (!catalog || !schema || !table) {
+ showNotification('error', 'Missing Information', 'Please enter catalog, schema, and table name.');
+ return;
+ }
+
+ // Show loading state
+ const originalHTML = this.innerHTML;
+ this.innerHTML = 'Refreshing...';
+ this.disabled = true;
+
+ try {
+ console.log('Sending refresh request...');
+ const warehouseId = document.getElementById('warehouse_id').value;
+ const response = await fetch('/api/refresh-data', {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ },
+ body: JSON.stringify({
+ catalog: catalog,
+ schema: schema,
+ table: table,
+ warehouse_id: warehouseId,
+ })
+ });
+
+ console.log('Response status:', response.status);
+ const result = await response.json();
+ console.log('Response data:', result);
+
+ if (result.success) {
+ showNotification('success', 'Data Refreshed!', `Loaded ${result.row_count} rows from ${catalog}.${schema}.${table}`);
+
+ // Update the table with new data
+ console.log('Updating table with data:', result.data);
+ updateTableWithData(result.data);
+
+ // Update the stats cards
+ if (result.stats) {
+ console.log('Updating stats:', result.stats);
+ updateStatsCards(result.stats);
+ }
+
+ // Reset button state
+ this.innerHTML = originalHTML;
+ this.disabled = false;
+ } else {
+ throw new Error(result.error || 'Failed to refresh data');
+ }
+ } catch (error) {
+ console.error('Error refreshing data:', error);
+ showNotification('error', 'Refresh Failed', error.message);
+
+ // Reset button state
+ this.innerHTML = originalHTML;
+ this.disabled = false;
+ }
+ });
+ }
+
+ // Stealth Auto-Refresh Function (runs every 30 seconds in the background)
+ async function stealthRefresh() {
+ const catalog = document.getElementById('catalog').value;
+ const schema = document.getElementById('schema').value;
+ const table = document.getElementById('table').value;
+ const warehouseId = document.getElementById('warehouse_id').value;
+
+ // Only refresh if all fields are filled
+ if (!catalog || !schema || !table) {
+ return;
+ }
+
+ try {
+ console.log('Auto-refreshing data in background...');
+ const response = await fetch('/api/refresh-data', {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ },
+ body: JSON.stringify({
+ catalog: catalog,
+ schema: schema,
+ table: table,
+ warehouse_id: warehouseId,
+ })
+ });
+
+ const result = await response.json();
+
+ if (result.success) {
+ console.log(`Auto-refresh successful: ${result.row_count} rows loaded`);
+ // Update the table silently without notification
+ updateTableWithData(result.data);
+ // Update the stats cards
+ if (result.stats) {
+ updateStatsCards(result.stats);
+ }
+ } else {
+ console.error('Auto-refresh failed:', result.error);
+ }
+ } catch (error) {
+ console.error('Auto-refresh error:', error);
+ }
+ }
+
+ // Start auto-refresh interval (every 30 seconds)
+ setInterval(stealthRefresh, 30000);
+ console.log('Auto-refresh enabled: Data will refresh every 30 seconds');
+
+ // TLS Settings Toggle Handler
+ const requireTlsCheckbox = document.getElementById('require_tls');
+ const tlsSettings = document.getElementById('tlsSettings');
+ const tlsDisableCertsCheckbox = document.getElementById('tls_disable_certs');
+ const certificateFields = document.querySelectorAll('#ca_certs, #certfile, #keyfile');
+
+ // Function to toggle TLS settings visibility
+ function toggleTlsSettings() {
+ const portField = document.getElementById('port');
+
+ if (requireTlsCheckbox.checked) {
+ tlsSettings.classList.remove('hidden');
+ // Auto-update port to TLS default if it's still at non-TLS default
+ if (portField.value === '1883') {
+ portField.value = '8883';
+ }
+ } else {
+ tlsSettings.classList.add('hidden');
+ // Auto-update port to non-TLS default if it's still at TLS default
+ if (portField.value === '8883') {
+ portField.value = '1883';
+ }
+ }
+ }
+ // Function to toggle certificate fields based on disable verification checkbox
+ function toggleCertificateFields() {
+ certificateFields.forEach(field => {
+ if (tlsDisableCertsCheckbox.checked) {
+ field.disabled = true;
+ field.classList.add('bg-gray-100', 'text-gray-400');
+ field.value = '';
+ } else {
+ field.disabled = false;
+ field.classList.remove('bg-gray-100', 'text-gray-400');
+ }
+ });
+ }
+ // Event listeners for TLS settings
+ requireTlsCheckbox.addEventListener('change', toggleTlsSettings);
+ tlsDisableCertsCheckbox.addEventListener('change', toggleCertificateFields);
+
+ // Initialize TLS settings visibility
+ toggleTlsSettings();
+ toggleCertificateFields();
+
+ // Load saved configuration on page load
+ const savedConfig = localStorage.getItem('mqttConfig');
+ if (savedConfig) {
+ const config = JSON.parse(savedConfig);
+ Object.keys(config).forEach(key => {
+ const element = document.getElementById(key);
+ if (element) {
+ if (element.type === 'checkbox') {
+ element.checked = config[key] === 'on';
+ } else {
+ element.value = config[key];
+ }
+ }
+ });
+ // Re-initialize TLS settings after loading config
+ toggleTlsSettings();
+ toggleCertificateFields();
+ }
+});
+
+// MQTT Message Search Functionality
+document.addEventListener('DOMContentLoaded', function() {
+ const searchInput = document.getElementById('searchInput');
+ const tableBody = document.getElementById('messageTableBody');
+ const rows = tableBody.getElementsByTagName('tr');
+
+ // Convert HTMLCollection to Array and filter out the "no results" row
+ const rowsArray = Array.from(rows).filter(row => row.id !== 'noResultsRow');
+
+ // Function to update stats counters based on visible rows
+ function updateStatsCounters(visibleRows) {
+ let critical = 0;
+ let processing = 0;
+ let processed = 0;
+
+ visibleRows.forEach(function(row) {
+ const severity = row.getAttribute('data-severity') || '';
+ const status = row.getAttribute('data-status') || '';
+
+ // Count critical messages that are not resolved
+ if (severity.toLowerCase() === 'critical' && status !== 'resolved') {
+ critical++;
+ }
+ // Count in progress messages
+ if (status === 'in_progress') {
+ processing++;
+ }
+ // Count resolved messages
+ if (status === 'resolved') {
+ processed++;
+ }
+ });
+
+ // Update the counter displays
+ const criticalElement = document.querySelector('.text-2xl.font-semibold.text-mqtt-error');
+ const processingElement = document.querySelector('.text-2xl.font-semibold.text-mqtt-warning');
+ const processedElement = document.querySelector('.text-2xl.font-semibold.text-mqtt-success');
+
+ if (criticalElement) criticalElement.textContent = critical;
+ if (processingElement) processingElement.textContent = processing;
+ if (processedElement) processedElement.textContent = processed;
+ }
+
+ searchInput.addEventListener('input', function() {
+ const searchTerm = this.value.toLowerCase().trim();
+ console.log('๐ Search triggered with term:', searchTerm);
+
+ rowsArray.forEach(function(row) {
+ if (searchTerm === '') {
+ // Show all rows when search is empty
+ row.style.display = '';
+ } else {
+ // Get searchable data from row attributes (only topic/source)
+ const topic = row.getAttribute('data-topic') || '';
+
+ // Search only in the topic/source field
+ const searchableText = topic;
+
+ // Debug logging for any search term
+ if (searchTerm.length > 0) {
+ console.log('Row topic: ' + topic + ', searchTerm: ' + searchTerm + ', match: ' + searchableText.includes(searchTerm));
+ }
+ // Show/hide row based on search match
+ if (searchableText.includes(searchTerm)) {
+ row.style.display = '';
+ } else {
+ row.style.display = 'none';
+ }
+ }
+ });
+
+ // Update counters for empty search (show all data)
+ if (searchTerm === '') {
+ updateStatsCounters(rowsArray);
+ }
+ // Update search results count and show/hide no results message
+ const visibleRows = rowsArray.filter(row => row.style.display !== 'none').length;
+ const noResultsRow = document.getElementById('noResultsRow');
+
+ if (searchTerm !== '' && visibleRows === 0) {
+ noResultsRow.style.display = '';
+ } else {
+ noResultsRow.style.display = 'none';
+ }
+ // Update stats counters based on visible rows
+ updateStatsCounters(rowsArray.filter(row => row.style.display !== 'none'));
+
+ console.log(`Search: "${searchTerm}" - ${visibleRows} results found`);
+ });
+
+ // Add search shortcut (Ctrl/Cmd + K)
+ document.addEventListener('keydown', function(e) {
+ if ((e.ctrlKey || e.metaKey) && e.key === 'k') {
+ e.preventDefault();
+ searchInput.focus();
+ searchInput.select();
+ }
+ });
+
+ console.log('๐ MQTT Search functionality initialized');
+ console.log('๐ก Tip: Use Ctrl/Cmd + K to focus search box');
+ console.log('๐ Found ' + rowsArray.length + ' searchable rows');
+
+ // Test search functionality immediately
+ console.log('๐งช Testing search functionality...');
+ rowsArray.forEach(function(row, index) {
+ const topic = row.getAttribute('data-topic') || '';
+ console.log('Row ' + (index + 1) + ' topic: ' + topic);
+ });
+});
diff --git a/mqtt/apps/MQTT Data Monitor/templates/dashboard.html b/mqtt/apps/MQTT Data Monitor/templates/dashboard.html
new file mode 100644
index 0000000..dd8196e
--- /dev/null
+++ b/mqtt/apps/MQTT Data Monitor/templates/dashboard.html
@@ -0,0 +1,445 @@
+
+
+
+
+
+ MQTT Data Monitor
+
+
+
+
+
+
+
+
+
+
+
+
+
+ MQTT Data Monitor
+
+
Real-time MQTT Message Monitoring & Analytics
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ MQTT Broker Configuration
+
+
Configure your MQTT broker connection settings to start monitoring messages.
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Duplicated Messages
+
{{ stats.duplicated_messages }}
+
+
+
+
+
+
+
+
+
QOS=2 Messages
+
{{ stats.qos2_messages }}
+
+
+
+
+
+
+
+
+
Unique Topics
+
{{ stats.unique_topics }}
+
+
+
+
+
+
+
+
+
Total Messages
+
{{ stats.total_messages }}
+
+
+
+
+
+
+
+
+
Recent MQTT Messages
+
+
+
+
+
+ |
+ Message
+ |
+
+ Is Duplicate
+ |
+
+ QoS
+ |
+
+ Topic
+ |
+
+ Received Time
+ |
+
+
+
+
+
+
+
+
+ No MQTT messages found matching your search.
+ Search by topic (source) only.
+
+ |
+
+
+ {% if not curr_data or curr_data|length == 0 %}
+
+
+
+
+ No Data Loaded
+ Please specify your Catalog, Schema, and Table in the MQTT Connection tab,
+ then click "Refresh Dashboard Data" to load messages.
+
+ |
+
+ {% else %}
+ {% for row in curr_data %}
+ {% set is_dup = row.get('is_duplicate', 'false')|string|lower %}
+ {% set dup_color = 'bg-red-100 text-red-800' if is_dup == 'true' else 'bg-green-100 text-green-800' %}
+ {% set dup_text = 'Yes' if is_dup == 'true' else 'No' %}
+ {% set qos_value = row.get('qos', 0)|string %}
+ {% set qos_color = 'bg-blue-100 text-blue-800' if qos_value == '2' else 'bg-yellow-100 text-yellow-800' if qos_value == '1' else 'bg-gray-100 text-gray-800' %}
+ {% set message = row.get('message', '')|string %}
+ {% set message_display = message[:100] + '...' if message|length > 100 else message %}
+ {% set topic = row.get('topic', 'N/A')|string %}
+ {% set received_time = row.get('received_time', 'N/A')|string %}
+
+
+
+
+ {{ message_display }}
+
+ |
+
+
+ {{ dup_text }}
+
+ |
+
+
+ QoS {{ qos_value }}
+
+ |
+
+ {{ topic }}
+ |
+
+ {{ received_time }}
+ |
+
+ {% endfor %}
+ {% endif %}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/mqtt/examples/MQTT_data_source_notebook.ipynb b/mqtt/examples/MQTT_data_source_notebook.ipynb
new file mode 100644
index 0000000..cf820be
--- /dev/null
+++ b/mqtt/examples/MQTT_data_source_notebook.ipynb
@@ -0,0 +1,909 @@
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": 0,
+ "metadata": {
+ "application/vnd.databricks.v1+cell": {
+ "cellMetadata": {
+ "byteLimit": 2048000,
+ "rowLimit": 10000
+ },
+ "inputWidgets": {},
+ "nuid": "1574b02c-b343-42b8-94eb-48b11d3758ce",
+ "showTitle": false,
+ "tableResultSettingsMap": {},
+ "title": ""
+ }
+ },
+ "outputs": [],
+ "source": [
+ "import datetime\n",
+ "\n",
+ "time = datetime.datetime.now()\n",
+ "dbutils.widgets.text(\"table\", f\"mqtt{time}\")\n",
+ "dbutils.widgets.text(\"database\", \"db_mqtt\")\n",
+ "dbutils.widgets.text(\"catalog\", \"mqtt_cat\")\n",
+ "dbutils.widgets.text(\"broker\", \"localhost\")\n",
+ "dbutils.widgets.text(\"username\", \"user\")\n",
+ "dbutils.widgets.text(\"password\", \"password\")\n",
+ "dbutils.widgets.text(\"topic\", \"#\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 0,
+ "metadata": {
+ "application/vnd.databricks.v1+cell": {
+ "cellMetadata": {
+ "byteLimit": 2048000,
+ "rowLimit": 10000
+ },
+ "inputWidgets": {},
+ "nuid": "12882d09-6969-460a-9e60-e274df7d1f55",
+ "showTitle": false,
+ "tableResultSettingsMap": {},
+ "title": ""
+ }
+ },
+ "outputs": [],
+ "source": [
+ "table = dbutils.widgets.get(\"table\")\n",
+ "catalog = dbutils.widgets.get(\"catalog\")\n",
+ "database = dbutils.widgets.get(\"database\")\n",
+ "topic = dbutils.widgets.get(\"topic\")\n",
+ "broker = dbutils.widgets.get(\"broker\")\n",
+ "username = dbutils.widgets.get(\"username\")\n",
+ "password = dbutils.widgets.get(\"password\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 0,
+ "metadata": {
+ "application/vnd.databricks.v1+cell": {
+ "cellMetadata": {
+ "byteLimit": 2048000,
+ "rowLimit": 10000
+ },
+ "inputWidgets": {},
+ "nuid": "9dc666a2-59cb-4c80-8491-33ade3d0914b",
+ "showTitle": false,
+ "tableResultSettingsMap": {},
+ "title": ""
+ }
+ },
+ "outputs": [],
+ "source": [
+ "print(f\"table: {table}\")\n",
+ "print(f\"catalog: {catalog}\")\n",
+ "print(f\"database: {database}\")\n",
+ "print(f\"topic: {topic}\")\n",
+ "print(f\"broker: {broker}\")\n",
+ "print(f\"username: {username}\")\n",
+ "print(f\"password: {password}\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 0,
+ "metadata": {
+ "application/vnd.databricks.v1+cell": {
+ "cellMetadata": {
+ "byteLimit": 2048000,
+ "rowLimit": 10000
+ },
+ "inputWidgets": {},
+ "nuid": "fbed97ba-6abe-4bdb-9c5f-fb477c3eb437",
+ "showTitle": false,
+ "tableResultSettingsMap": {},
+ "title": ""
+ }
+ },
+ "outputs": [],
+ "source": [
+ "%pip install paho-mqtt"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 0,
+ "metadata": {
+ "application/vnd.databricks.v1+cell": {
+ "cellMetadata": {
+ "byteLimit": 2048000,
+ "rowLimit": 10000
+ },
+ "inputWidgets": {},
+ "nuid": "3920ef9a-eea5-4343-bb70-79e642626a2a",
+ "showTitle": false,
+ "tableResultSettingsMap": {},
+ "title": ""
+ }
+ },
+ "outputs": [],
+ "source": [
+ "import datetime\n",
+ "import ipaddress\n",
+ "import logging\n",
+ "import random\n",
+ "import re\n",
+ "import subprocess\n",
+ "import sys\n",
+ "import time\n",
+ "\n",
+ "from pyspark.errors import PySparkException\n",
+ "from pyspark.sql.datasource import DataSource, InputPartition, SimpleDataSourceStreamReader\n",
+ "from pyspark.sql.types import StructType, StructField, StringType\n",
+ "\n",
+ "logging.basicConfig()\n",
+ "logger = logging.getLogger(__name__)\n",
+ "logger.setLevel(logging.DEBUG)\n",
+ "\n",
+ "\n",
+ "class RangePartition(InputPartition):\n",
+ " def __init__(self, start, end):\n",
+ " self.start = start\n",
+ " self.end = end\n",
+ "\n",
+ "\n",
+ "class MqttDataSource(DataSource):\n",
+ " \"\"\"\n",
+ " A PySpark DataSource for reading MQTT messages from a broker.\n",
+ " \n",
+ " This data source allows you to stream MQTT messages into Spark DataFrames,\n",
+ " supporting various MQTT broker configurations including authentication,\n",
+ " SSL/TLS encryption, and different quality of service levels.\n",
+ " \n",
+ " Input validation is performed on critical parameters to ensure connection reliability.\n",
+ " \n",
+ " Supported options:\n",
+ " - broker_address: MQTT broker hostname or IP address (required, validated)\n",
+ " * Must be a valid hostname or IP address format\n",
+ " * Cannot be None, empty, or whitespace-only\n",
+ " - port: Broker port number (default: 8883, validated)\n",
+ " * Must be an integer in range 1-65535\n",
+ " - username: Authentication username (optional)\n",
+ " - password: Authentication password (optional)\n",
+ " - topic: MQTT topic to subscribe to (default: \"#\" for all topics)\n",
+ " - qos: Quality of Service level 0-2 (default: 0, validated)\n",
+ " * Must be 0, 1, or 2 (standard MQTT QoS levels)\n",
+ " - require_tls: Enable SSL/TLS encryption (default: true)\n",
+ " - keepalive: Keep alive interval in seconds (default: 60)\n",
+ " \n",
+ " Example usage:\n",
+ " spark.readStream.format(\"mqtt_pub_sub\")\n",
+ " .option(\"broker_address\", \"mqtt.example.com\")\n",
+ " .option(\"topic\", \"sensors/+/temperature\")\n",
+ " .option(\"username\", \"user\")\n",
+ " .option(\"password\", \"pass\")\n",
+ " .load()\n",
+ " \n",
+ " Raises:\n",
+ " ValueError: If broker_address, port, clean_session, or qos parameters are invalid.\n",
+ " \"\"\"\n",
+ "\n",
+ " @classmethod\n",
+ " def name(cls):\n",
+ " \"\"\"Returns the name of the data source.\"\"\"\n",
+ " return \"mqtt_pub_sub\"\n",
+ "\n",
+ " def __init__(self, options):\n",
+ " \"\"\"\n",
+ " Initialize the MQTT data source with configuration options.\n",
+ " \n",
+ " Args:\n",
+ " options (dict): Configuration options for the MQTT connection.\n",
+ " See class docstring for supported options.\n",
+ " \"\"\"\n",
+ " self.options = options\n",
+ "\n",
+ " def schema(self):\n",
+ " \"\"\"\n",
+ " Define the schema of the data source.\n",
+ " \n",
+ " Returns:\n",
+ " StructType: The schema of the data source.\n",
+ " \"\"\"\n",
+ " return StructType([\n",
+ " StructField(\"received_time\", StringType(), True),\n",
+ " StructField(\"topic\", StringType(), True),\n",
+ " StructField(\"message\", StringType(), True),\n",
+ " StructField(\"is_duplicate\", StringType(), True),\n",
+ " StructField(\"qos\", StringType(), True),\n",
+ " StructField(\"is_retained\", StringType(), True)\n",
+ " ])\n",
+ "\n",
+ " def streamReader(self, schema: StructType):\n",
+ " \"\"\"\n",
+ " Create and return a stream reader for MQTT data.\n",
+ " \n",
+ " Args:\n",
+ " schema (StructType): The schema for the streaming data.\n",
+ " \n",
+ " Returns:\n",
+ " MqttSimpleStreamReader: A stream reader instance configured for MQTT.\n",
+ " \"\"\"\n",
+ " return MqttSimpleStreamReader(schema, self.options)\n",
+ "\n",
+ "\n",
+ "class MqttSimpleStreamReader(SimpleDataSourceStreamReader):\n",
+ "\n",
+ " def __init__(self, schema, options):\n",
+ " \"\"\"\n",
+ " Initialize the MQTT simple stream reader with configuration options.\n",
+ " \n",
+ " Args:\n",
+ " schema (StructType): The schema for the streaming data.\n",
+ " options (dict): Configuration options for the MQTT connection.\n",
+ " See class docstring for supported options.\n",
+ " \"\"\"\n",
+ " self._install_paho_mqtt()\n",
+ " super().__init__()\n",
+ " self.topic = self._parse_topic(options.get(\"topic\", \"#\"))\n",
+ " self.broker_address = options.get(\"broker_address\")\n",
+ " self.require_tls = options.get(\"require_tls\", True)\n",
+ " self.port = int(options.get(\"port\", 8883))\n",
+ " self.username = options.get(\"username\", \"\")\n",
+ " self.password = options.get(\"password\", \"\")\n",
+ " self.qos = int(options.get(\"qos\", 2))\n",
+ " self.keep_alive = int(options.get(\"keepalive\", 60))\n",
+ " self.clean_session = options.get(\"clean_session\", False)\n",
+ " self.conn_timeout = int(options.get(\"conn_time\", 1))\n",
+ " self.clean_session = options.get(\"clean_session\", False)\n",
+ " self.ca_certs = options.get(\"ca_certs\", None)\n",
+ " self.certfile = options.get(\"certfile\", None)\n",
+ " self.keyfile = options.get(\"keyfile\", None)\n",
+ " self.tls_disable_certs = options.get(\"tls_disable_certs\", None)\n",
+ " \n",
+ " # Validate all input parameters\n",
+ " self._validate_input_parameters()\n",
+ " \n",
+ " if self.clean_session not in [True, False]:\n",
+ " raise ValueError(f\"Unsupported sesion: {self.clean_session}\")\n",
+ " self.client_id = f'spark-data-source-mqtt-{random.randint(0, 1000000)}'\n",
+ " self.current = 0\n",
+ " self.new_data = []\n",
+ "\n",
+ " def _install_paho_mqtt(self):\n",
+ " try:\n",
+ " import paho.mqtt.client\n",
+ " except ImportError:\n",
+ " logger.warn(\"Installing paho-mqtt...\")\n",
+ " subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"paho-mqtt\"])\n",
+ " # importlib.reload(sys.modules[__name__])\n",
+ "\n",
+ " def _validate_input_parameters(self):\n",
+ " \"\"\"\n",
+ " Validate all input parameters for the MQTT connection.\n",
+ " \n",
+ " Raises:\n",
+ " ValueError: If any parameter is invalid.\n",
+ " \"\"\"\n",
+ " # Validate broker address\n",
+ " self._validate_broker_address()\n",
+ " \n",
+ " # Validate port range\n",
+ " self._validate_port()\n",
+ " \n",
+ " # Validate QoS level\n",
+ " self._validate_qos()\n",
+ "\n",
+ " def _validate_broker_address(self):\n",
+ " \"\"\"\n",
+ " Validate that the broker address is provided and properly formatted.\n",
+ " \n",
+ " Raises:\n",
+ " ValueError: If broker address is None, empty, or improperly formatted.\n",
+ " \"\"\"\n",
+ " if not self.broker_address:\n",
+ " raise ValueError(\"broker_address is required and cannot be None or empty\")\n",
+ " \n",
+ " if not isinstance(self.broker_address, str):\n",
+ " raise ValueError(\"broker_address must be a string\")\n",
+ " \n",
+ " self.broker_address = self.broker_address.strip()\n",
+ " if not self.broker_address:\n",
+ " raise ValueError(\"broker_address cannot be empty or just whitespace\")\n",
+ " \n",
+ " # Check if it's a valid IP address\n",
+ " try:\n",
+ " ipaddress.ip_address(self.broker_address)\n",
+ " return # Valid IP address\n",
+ " except ValueError:\n",
+ " pass # Not an IP address, check if it's a valid hostname\n",
+ " \n",
+ " # Validate hostname format\n",
+ " if not self._is_valid_hostname(self.broker_address):\n",
+ " raise ValueError(f\"broker_address '{self.broker_address}' is not a valid hostname or IP address\")\n",
+ "\n",
+ " def _is_valid_hostname(self, hostname):\n",
+ " \"\"\"\n",
+ " Check if a string is a valid hostname according to RFC standards.\n",
+ " \n",
+ " Args:\n",
+ " hostname (str): The hostname to validate.\n",
+ " \n",
+ " Returns:\n",
+ " bool: True if valid hostname, False otherwise.\n",
+ " \"\"\"\n",
+ " if len(hostname) > 253:\n",
+ " return False\n",
+ " \n",
+ " # Remove trailing dot if present\n",
+ " if hostname.endswith('.'):\n",
+ " hostname = hostname[:-1]\n",
+ " \n",
+ " # Hostname regex pattern\n",
+ " # Allows letters, numbers, hyphens, and dots\n",
+ " # Must start and end with alphanumeric characters\n",
+ " hostname_pattern = re.compile(\n",
+ " r'^(?!-)(?:[a-zA-Z0-9-]{1,63}(? 65535:\n",
+ " raise ValueError(f\"port must be in range 1-65535, got {self.port}\")\n",
+ "\n",
+ " def _validate_qos(self):\n",
+ " \"\"\"\n",
+ " Validate that the QoS level is one of the valid MQTT QoS values (0, 1, or 2).\n",
+ " \n",
+ " Raises:\n",
+ " ValueError: If QoS is not 0, 1, or 2.\n",
+ " \"\"\"\n",
+ " if not isinstance(self.qos, int):\n",
+ " raise ValueError(f\"qos must be an integer, got {type(self.qos).__name__}\")\n",
+ " \n",
+ " valid_qos_levels = [0, 1, 2]\n",
+ " if self.qos not in valid_qos_levels:\n",
+ " raise ValueError(f\"qos must be one of {valid_qos_levels}, got {self.qos}\")\n",
+ "\n",
+ " def _parse_topic(self, topic_str: str):\n",
+ " \"\"\"\n",
+ " TODO: add docs, implement parsing of topic string\n",
+ " \"\"\"\n",
+ " return topic_str\n",
+ "\n",
+ " def _configure_tls(self, client):\n",
+ " \"\"\"\n",
+ " Configure TLS settings on the MQTT client based on provided certificate options.\n",
+ " \"\"\"\n",
+ " if self.require_tls:\n",
+ " # Build tls_set arguments based on provided certificates\n",
+ " tls_args = {}\n",
+ " \n",
+ " if self.ca_certs:\n",
+ " tls_args['ca_certs'] = self.ca_certs\n",
+ " \n",
+ " if self.certfile:\n",
+ " tls_args['certfile'] = self.certfile\n",
+ " \n",
+ " if self.keyfile:\n",
+ " tls_args['keyfile'] = self.keyfile\n",
+ " \n",
+ " # Call tls_set with the appropriate parameters\n",
+ " if tls_args:\n",
+ " client.tls_set(**tls_args)\n",
+ " logger.info(f\"TLS configured with certificates: {list(tls_args.keys())}\")\n",
+ " else:\n",
+ " # Basic TLS without custom certificates\n",
+ " client.tls_set()\n",
+ " logger.info(\"Basic TLS enabled\")\n",
+ " else:\n",
+ " logger.info(\"TLS disabled\")\n",
+ "\n",
+ " def initialOffset(self):\n",
+ " return {\"offset\": 0}\n",
+ "\n",
+ " def latestOffset(self) -> dict:\n",
+ " \"\"\"\n",
+ " Returns the current latest offset that the next microbatch will read to.\n",
+ " \"\"\"\n",
+ " self.current += 1\n",
+ " return {\"offset\": self.current}\n",
+ "\n",
+ " def partitions(self, start: dict, end: dict):\n",
+ "\n",
+ " \"\"\"\n",
+ " Plans the partitioning of the current microbatch defined by start and end offset. It\n",
+ " needs to return a sequence of :class:`InputPartition` objects.\n",
+ " \"\"\"\n",
+ " return [RangePartition(start[\"offset\"], end[\"offset\"])]\n",
+ "\n",
+ " def read(self, partition):\n",
+ " \"\"\"\n",
+ " Read MQTT messages from the broker.\n",
+ " \n",
+ " Args:\n",
+ " partition (RangePartition): The partition to read from.\n",
+ " \n",
+ " Returns:\n",
+ " Iterator[list]: An iterator of lists containing the MQTT message data.\n",
+ " The list contains the following elements:\n",
+ " - received_time: The time the message was received.\n",
+ " - topic: The topic of the message.\n",
+ " - message: The payload of the message.\n",
+ " - is_duplicate: Whether the message is a duplicate.\n",
+ " - qos: The quality of service level of the message.\n",
+ " - is_retained: Whether the message is retained.\n",
+ " \n",
+ " Raises:\n",
+ " Exception: If the connection to the broker fails.\n",
+ " \"\"\"\n",
+ " import paho.mqtt.client as mqttClient\n",
+ "\n",
+ " def _get_mqtt_client():\n",
+ " return mqttClient.Client(mqttClient.CallbackAPIVersion.VERSION1, self.client_id,\n",
+ " clean_session=self.clean_session)\n",
+ "\n",
+ " client = _get_mqtt_client()\n",
+ " \n",
+ " # Configure TLS with certificates if provided\n",
+ " self._configure_tls(client)\n",
+ " \n",
+ " client.username_pw_set(self.username, self.password)\n",
+ "\n",
+ " def on_connect(client, userdata, flags, rc):\n",
+ " if rc == 0:\n",
+ " client.subscribe(self.topic, qos=self.qos)\n",
+ " logger.warning(f\"Connected to broker {self.broker_address} on port {self.port} with topic {self.topic}\")\n",
+ " else:\n",
+ " logger.error(f\"Connection failed to broker {self.broker_address} on port {self.port} with topic {self.topic}\")\n",
+ "\n",
+ " def on_message(client, userdata, message):\n",
+ " msg_data = [\n",
+ " str(datetime.datetime.now()),\n",
+ " message.topic,\n",
+ " str(message.payload.decode(\"utf-8\", \"ignore\")),\n",
+ " message.dup,\n",
+ " message.qos,\n",
+ " message.retain\n",
+ " ]\n",
+ " logger.warning(msg_data)\n",
+ " self.new_data.append(msg_data)\n",
+ "\n",
+ " client.on_connect = on_connect\n",
+ " client.on_message = on_message\n",
+ "\n",
+ " try:\n",
+ " client.connect(self.broker_address, self.port, self.keep_alive)\n",
+ " except Exception as e:\n",
+ " connection_context = {\n",
+ " \"broker_address\": self.broker_address,\n",
+ " \"port\": self.port,\n",
+ " \"topic\": self.topic,\n",
+ " \"client_id\": self.client_id,\n",
+ " \"require_tls\": self.require_tls,\n",
+ " \"keepalive\": self.keep_alive,\n",
+ " \"qos\": self.qos,\n",
+ " \"clean_session\": self.clean_session,\n",
+ " \"conn_timeout\": self.conn_timeout\n",
+ " }\n",
+ " \n",
+ " error_msg = f\"Failed to connect to MQTT broker. Connection details: {connection_context}\"\n",
+ " logger.exception(error_msg, exc_info=e)\n",
+ " \n",
+ " # Re-raise with enhanced context\n",
+ " raise ConnectionError(error_msg) from e\n",
+ " client.loop_start() # Use loop_start to run the loop in a separate thread\n",
+ "\n",
+ " time.sleep(self.conn_timeout) # Wait for messages for the specified timeout\n",
+ "\n",
+ " client.loop_stop() # Stop the loop after the timeout\n",
+ " client.disconnect()\n",
+ " logger.warning(\"current state of data: %s\", self.new_data)\n",
+ "\n",
+ " return (iter(self.new_data))\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "class MqttSimpleStreamWriter():\n",
+ " #To be implemented\n",
+ " def __init__(self, schema, options):\n",
+ " pass\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 0,
+ "metadata": {
+ "application/vnd.databricks.v1+cell": {
+ "cellMetadata": {
+ "byteLimit": 2048000,
+ "rowLimit": 10000
+ },
+ "inputWidgets": {},
+ "nuid": "a7a953ea-6883-42af-9af0-ebcf782b6078",
+ "showTitle": false,
+ "tableResultSettingsMap": {},
+ "title": ""
+ }
+ },
+ "outputs": [],
+ "source": [
+ "spark.dataSource.register(MqttDataSource)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 0,
+ "metadata": {
+ "application/vnd.databricks.v1+cell": {
+ "cellMetadata": {
+ "byteLimit": 2048000,
+ "rowLimit": 10000
+ },
+ "inputWidgets": {},
+ "nuid": "f13f115f-4cc8-4739-9da9-c2bf9db0512f",
+ "showTitle": false,
+ "tableResultSettingsMap": {},
+ "title": ""
+ }
+ },
+ "outputs": [],
+ "source": [
+ "dataframe = (\n",
+ " spark.readStream.format(\"mqtt_pub_sub\")\n",
+ " .option(\"topic\", topic)\n",
+ " .option(\"broker_address\", broker)\n",
+ " .option(\"username\", username)\n",
+ " .option(\"password\", password)\n",
+ " .option(\"conn_timeout\", \"1\")\n",
+ " .option(\"qos\", 2)\n",
+ " .load()\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 0,
+ "metadata": {
+ "application/vnd.databricks.v1+cell": {
+ "cellMetadata": {
+ "byteLimit": 2048000,
+ "rowLimit": 10000
+ },
+ "inputWidgets": {},
+ "nuid": "f2e1cb57-2128-4825-b283-d705fed382a9",
+ "showTitle": false,
+ "tableResultSettingsMap": {},
+ "title": ""
+ }
+ },
+ "outputs": [],
+ "source": [
+ "import uuid\n",
+ "checkpoint = f\"/Volumes/{catalog}/{database}/checkpoints/check_{uuid.uuid4()}\""
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 0,
+ "metadata": {
+ "application/vnd.databricks.v1+cell": {
+ "cellMetadata": {
+ "byteLimit": 2048000,
+ "rowLimit": 10000
+ },
+ "inputWidgets": {},
+ "nuid": "0071ae01-f92e-4325-8109-a36cf6f0c096",
+ "showTitle": false,
+ "tableResultSettingsMap": {},
+ "title": ""
+ }
+ },
+ "outputs": [],
+ "source": [
+ "( dataframe\n",
+ " .writeStream\n",
+ " .format(\"delta\")\n",
+ " .option(\"checkpointLocation\", checkpoint)\n",
+ " .outputMode(\"append\")\n",
+ " .table(f\"{catalog}.{database}.{table}\")\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 0,
+ "metadata": {
+ "application/vnd.databricks.v1+cell": {
+ "cellMetadata": {},
+ "inputWidgets": {},
+ "nuid": "f4dc01a7-edb1-408f-973b-5b917371d672",
+ "showTitle": false,
+ "tableResultSettingsMap": {},
+ "title": ""
+ }
+ },
+ "outputs": [],
+ "source": []
+ }
+ ],
+ "metadata": {
+ "application/vnd.databricks.v1+notebook": {
+ "computePreferences": {
+ "hardware": {
+ "accelerator": null,
+ "gpuPoolId": null,
+ "memory": null
+ }
+ },
+ "dashboards": [],
+ "environmentMetadata": {
+ "base_environment": "",
+ "dependencies": [
+ "paho-mqtt>=2.0.0"
+ ],
+ "environment_version": "2"
+ },
+ "inputWidgetPreferences": {
+ "autoRunOnWidgetChange": "auto-run-selected-command"
+ },
+ "language": "python",
+ "notebookMetadata": {
+ "pythonIndentUnit": 2,
+ "widgetLayout": [
+ {
+ "breakBefore": false,
+ "name": "broker",
+ "width": 218
+ },
+ {
+ "breakBefore": false,
+ "name": "catalog",
+ "width": 218
+ },
+ {
+ "breakBefore": false,
+ "name": "database",
+ "width": 218
+ },
+ {
+ "breakBefore": false,
+ "name": "param_10",
+ "width": 62
+ },
+ {
+ "breakBefore": false,
+ "name": "param_11",
+ "width": 62
+ },
+ {
+ "breakBefore": false,
+ "name": "param_8",
+ "width": 62
+ },
+ {
+ "breakBefore": false,
+ "name": "param_9",
+ "width": 62
+ },
+ {
+ "breakBefore": false,
+ "name": "password",
+ "width": 218
+ },
+ {
+ "breakBefore": false,
+ "name": "table",
+ "width": 218
+ },
+ {
+ "breakBefore": false,
+ "name": "topic",
+ "width": 218
+ },
+ {
+ "breakBefore": false,
+ "name": "username",
+ "width": 218
+ }
+ ]
+ },
+ "notebookName": "MQTT_data_source_notebook",
+ "widgets": {
+ "broker": {
+ "currentValue": "localhost",
+ "nuid": "ed744063-20f6-4b22-be1a-7bff46e47f71",
+ "typedWidgetInfo": {
+ "autoCreated": false,
+ "defaultValue": "localhost",
+ "label": null,
+ "name": "broker",
+ "options": {
+ "validationRegex": null,
+ "widgetDisplayType": "Text"
+ },
+ "parameterDataType": "String"
+ },
+ "widgetInfo": {
+ "defaultValue": "localhost",
+ "label": null,
+ "name": "broker",
+ "options": {
+ "autoCreated": null,
+ "validationRegex": null,
+ "widgetType": "text"
+ },
+ "widgetType": "text"
+ }
+ },
+ "catalog": {
+ "currentValue": "mqtt_cat",
+ "nuid": "9fb56318-0e9b-489d-b4b4-cf5e39e25e1a",
+ "typedWidgetInfo": {
+ "autoCreated": false,
+ "defaultValue": "mqtt_cat",
+ "label": null,
+ "name": "catalog",
+ "options": {
+ "validationRegex": null,
+ "widgetDisplayType": "Text"
+ },
+ "parameterDataType": "String"
+ },
+ "widgetInfo": {
+ "defaultValue": "mqtt_cat",
+ "label": null,
+ "name": "catalog",
+ "options": {
+ "autoCreated": null,
+ "validationRegex": null,
+ "widgetType": "text"
+ },
+ "widgetType": "text"
+ }
+ },
+ "database": {
+ "currentValue": "db_mqtt",
+ "nuid": "3d45be95-5ef2-43b4-8c4f-8544ede64d20",
+ "typedWidgetInfo": {
+ "autoCreated": false,
+ "defaultValue": "db_mqtt",
+ "label": null,
+ "name": "database",
+ "options": {
+ "validationRegex": null,
+ "widgetDisplayType": "Text"
+ },
+ "parameterDataType": "String"
+ },
+ "widgetInfo": {
+ "defaultValue": "db_mqtt",
+ "label": null,
+ "name": "database",
+ "options": {
+ "autoCreated": null,
+ "validationRegex": null,
+ "widgetType": "text"
+ },
+ "widgetType": "text"
+ }
+ },
+ "password": {
+ "currentValue": "password",
+ "nuid": "a59268ce-d461-47fc-abb8-3beafddb69a6",
+ "typedWidgetInfo": {
+ "autoCreated": false,
+ "defaultValue": "password",
+ "label": null,
+ "name": "password",
+ "options": {
+ "validationRegex": null,
+ "widgetDisplayType": "Text"
+ },
+ "parameterDataType": "String"
+ },
+ "widgetInfo": {
+ "defaultValue": "password",
+ "label": null,
+ "name": "password",
+ "options": {
+ "autoCreated": null,
+ "validationRegex": null,
+ "widgetType": "text"
+ },
+ "widgetType": "text"
+ }
+ },
+ "table": {
+ "currentValue": "mqtt_v0",
+ "nuid": "e3777c0e-3522-4522-baeb-789fda26f28a",
+ "typedWidgetInfo": {
+ "autoCreated": false,
+ "defaultValue": "mqtt2025-10-20 04:05:02.680662",
+ "label": null,
+ "name": "table",
+ "options": {
+ "validationRegex": null,
+ "widgetDisplayType": "Text"
+ },
+ "parameterDataType": "String"
+ },
+ "widgetInfo": {
+ "defaultValue": "mqtt2025-10-20 04:05:02.680662",
+ "label": null,
+ "name": "table",
+ "options": {
+ "autoCreated": null,
+ "validationRegex": null,
+ "widgetType": "text"
+ },
+ "widgetType": "text"
+ }
+ },
+ "topic": {
+ "currentValue": "#",
+ "nuid": "0b1ddaa8-3ff6-41c4-923d-8a866275d0e0",
+ "typedWidgetInfo": {
+ "autoCreated": false,
+ "defaultValue": "#",
+ "label": null,
+ "name": "topic",
+ "options": {
+ "validationRegex": null,
+ "widgetDisplayType": "Text"
+ },
+ "parameterDataType": "String"
+ },
+ "widgetInfo": {
+ "defaultValue": "#",
+ "label": null,
+ "name": "topic",
+ "options": {
+ "autoCreated": null,
+ "validationRegex": null,
+ "widgetType": "text"
+ },
+ "widgetType": "text"
+ }
+ },
+ "username": {
+ "currentValue": "user",
+ "nuid": "9d5a0218-22ff-47cf-9739-89e2157895ef",
+ "typedWidgetInfo": {
+ "autoCreated": false,
+ "defaultValue": "user",
+ "label": null,
+ "name": "username",
+ "options": {
+ "validationRegex": null,
+ "widgetDisplayType": "Text"
+ },
+ "parameterDataType": "String"
+ },
+ "widgetInfo": {
+ "defaultValue": "user",
+ "label": null,
+ "name": "username",
+ "options": {
+ "autoCreated": null,
+ "validationRegex": null,
+ "widgetType": "text"
+ },
+ "widgetType": "text"
+ }
+ }
+ }
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 0
+}
diff --git a/mqtt/src/python_datasource_connectors/mqtt_streaming.py b/mqtt/src/python_datasource_connectors/mqtt_streaming.py
index ebc0a84..02d72e1 100644
--- a/mqtt/src/python_datasource_connectors/mqtt_streaming.py
+++ b/mqtt/src/python_datasource_connectors/mqtt_streaming.py
@@ -1,6 +1,8 @@
import datetime
+import ipaddress
import logging
import random
+import re
import subprocess
import sys
import time
@@ -28,13 +30,19 @@ class MqttDataSource(DataSource):
supporting various MQTT broker configurations including authentication,
SSL/TLS encryption, and different quality of service levels.
+ Input validation is performed on critical parameters to ensure connection reliability.
+
Supported options:
- - broker_address: MQTT broker hostname or IP address (required)
- - port: Broker port number (default: 8883)
+ - broker_address: MQTT broker hostname or IP address (required, validated)
+ * Must be a valid hostname or IP address format
+ * Cannot be None, empty, or whitespace-only
+ - port: Broker port number (default: 8883, validated)
+ * Must be an integer in range 1-65535
- username: Authentication username (optional)
- password: Authentication password (optional)
- topic: MQTT topic to subscribe to (default: "#" for all topics)
- - qos: Quality of Service level 0-2 (default: 0)
+ - qos: Quality of Service level 0-2 (default: 0, validated)
+ * Must be 0, 1, or 2 (standard MQTT QoS levels)
- require_tls: Enable SSL/TLS encryption (default: true)
- keepalive: Keep alive interval in seconds (default: 60)
@@ -45,6 +53,9 @@ class MqttDataSource(DataSource):
.option("username", "user")
.option("password", "pass")
.load()
+
+ Raises:
+ ValueError: If broker_address, port, clean_session, or qos parameters are invalid.
"""
@classmethod
@@ -110,17 +121,21 @@ def __init__(self, schema, options):
self.port = int(options.get("port", 8883))
self.username = options.get("username", "")
self.password = options.get("password", "")
- self.qos = int(options.get("qos", 0))
+ self.qos = int(options.get("qos", 2))
self.keep_alive = int(options.get("keepalive", 60))
self.clean_session = options.get("clean_session", False)
self.conn_timeout = int(options.get("conn_time", 1))
- self.clean_sesion = options.get("clean_sesion", False)
+ self.clean_session = options.get("clean_session", False)
self.ca_certs = options.get("ca_certs", None)
self.certfile = options.get("certfile", None)
self.keyfile = options.get("keyfile", None)
self.tls_disable_certs = options.get("tls_disable_certs", None)
- if self.clean_sesion not in [True, False]:
- raise ValueError(f"Unsupported sesion: {self.clean_sesion}")
+
+ # Validate all input parameters
+ self._validate_input_parameters()
+
+ if self.clean_session not in [True, False]:
+ raise ValueError(f"Unsupported sesion: {self.clean_session}")
self.client_id = f'spark-data-source-mqtt-{random.randint(0, 1000000)}'
self.current = 0
self.new_data = []
@@ -133,6 +148,103 @@ def _install_paho_mqtt(self):
subprocess.check_call([sys.executable, "-m", "pip", "install", "paho-mqtt"])
# importlib.reload(sys.modules[__name__])
+ def _validate_input_parameters(self):
+ """
+ Validate all input parameters for the MQTT connection.
+
+ Raises:
+ ValueError: If any parameter is invalid.
+ """
+ # Validate broker address
+ self._validate_broker_address()
+
+ # Validate port range
+ self._validate_port()
+
+ # Validate QoS level
+ self._validate_qos()
+
+ def _validate_broker_address(self):
+ """
+ Validate that the broker address is provided and properly formatted.
+
+ Raises:
+ ValueError: If broker address is None, empty, or improperly formatted.
+ """
+ if not self.broker_address:
+ raise ValueError("broker_address is required and cannot be None or empty")
+
+ if not isinstance(self.broker_address, str):
+ raise ValueError("broker_address must be a string")
+
+ self.broker_address = self.broker_address.strip()
+ if not self.broker_address:
+ raise ValueError("broker_address cannot be empty or just whitespace")
+
+ # Check if it's a valid IP address
+ try:
+ ipaddress.ip_address(self.broker_address)
+ return # Valid IP address
+ except ValueError:
+ pass # Not an IP address, check if it's a valid hostname
+
+ # Validate hostname format
+ if not self._is_valid_hostname(self.broker_address):
+ raise ValueError(f"broker_address '{self.broker_address}' is not a valid hostname or IP address")
+
+ def _is_valid_hostname(self, hostname):
+ """
+ Check if a string is a valid hostname according to RFC standards.
+
+ Args:
+ hostname (str): The hostname to validate.
+
+ Returns:
+ bool: True if valid hostname, False otherwise.
+ """
+ if len(hostname) > 253:
+ return False
+
+ # Remove trailing dot if present
+ if hostname.endswith('.'):
+ hostname = hostname[:-1]
+
+ # Hostname regex pattern
+ # Allows letters, numbers, hyphens, and dots
+ # Must start and end with alphanumeric characters
+ hostname_pattern = re.compile(
+ r'^(?!-)(?:[a-zA-Z0-9-]{1,63}(? 65535:
+ raise ValueError(f"port must be in range 1-65535, got {self.port}")
+
+ def _validate_qos(self):
+ """
+ Validate that the QoS level is one of the valid MQTT QoS values (0, 1, or 2).
+
+ Raises:
+ ValueError: If QoS is not 0, 1, or 2.
+ """
+ if not isinstance(self.qos, int):
+ raise ValueError(f"qos must be an integer, got {type(self.qos).__name__}")
+
+ valid_qos_levels = [0, 1, 2]
+ if self.qos not in valid_qos_levels:
+ raise ValueError(f"qos must be one of {valid_qos_levels}, got {self.qos}")
+
def _parse_topic(self, topic_str: str):
"""
TODO: add docs, implement parsing of topic string