Skip to content

Commit 70940ff

Browse files
authored
Merge pull request #12 from databricks-industry-solutions/feat/mqtt_app
Add MQTT Data Monitoring app
2 parents d3916cd + 12ad7ba commit 70940ff

File tree

9 files changed

+2789
-7
lines changed

9 files changed

+2789
-7
lines changed

mqtt/apps/.gitkeep

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Add your Databricks Apps here
2+
# Each app should be in its own subdirectory with:
3+
# - app.py (main application file)
4+
# - app.yaml (configuration)
5+
# - requirements.txt (dependencies)

mqtt/apps/MQTT Data Monitor/app.py

Lines changed: 374 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,374 @@
1+
from flask import Flask, request, jsonify, render_template
2+
import logging
3+
from datetime import datetime, timedelta
4+
import pandas
5+
from databricks import sql
6+
from databricks.sdk.core import Config
7+
from databricks.sdk import WorkspaceClient
8+
from databricks.sdk.service import jobs
9+
import os
10+
import json
11+
import time
12+
import re
13+
import paho.mqtt.client as mqtt_client
14+
15+
# Configure logging
16+
logging.basicConfig(
17+
level=logging.INFO,
18+
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
19+
)
20+
21+
# Create logger for this application
22+
logger = logging.getLogger(__name__)
23+
24+
# Suppress werkzeug logging to reduce noise
25+
werkzeug_log = logging.getLogger('werkzeug')
26+
werkzeug_log.setLevel(logging.ERROR)
27+
28+
flask_app = Flask(__name__)
29+
30+
31+
client = WorkspaceClient()
32+
33+
# Database connection function
34+
def get_data(query, warehouse_id, params=None):
35+
"""Execute query with fallback to demo data"""
36+
try:
37+
cfg = Config()
38+
if warehouse_id and cfg.host:
39+
with sql.connect(
40+
server_hostname=cfg.host,
41+
http_path=f"/sql/1.0/warehouses/{warehouse_id}",
42+
credentials_provider=lambda: cfg.authenticate
43+
) as connection:
44+
if params:
45+
df = pandas.read_sql(query, connection, params=params)
46+
else:
47+
df = pandas.read_sql(query, connection)
48+
49+
return df.to_dict('records')
50+
except Exception as e:
51+
logger.error(f"Database query failed: {e}")
52+
# Return empty list on error so we can show a message
53+
return []
54+
55+
56+
def create_job(client, notebook_path, cluster_id):
57+
58+
created_job = client.jobs.create(
59+
name=f"mqtt_{time.time_ns()}",
60+
tasks=[
61+
jobs.Task(
62+
description="mqtt",
63+
notebook_task=jobs.NotebookTask(notebook_path=notebook_path, source=jobs.Source("WORKSPACE")),
64+
task_key="mqtt",
65+
timeout_seconds=0,
66+
existing_cluster_id=cluster_id,
67+
)
68+
],
69+
)
70+
return created_job
71+
72+
def run_job(client, created_job, mqtt_config):
73+
"""Run the job with user-provided MQTT configuration"""
74+
run = client.jobs.run_now(
75+
job_id=created_job.job_id,
76+
notebook_params={
77+
"catalog": mqtt_config.get('catalog', 'dbdemos'),
78+
"database": mqtt_config.get('schema', 'dbdemos_mqtt'),
79+
"table": mqtt_config.get('table', 'mqtt_v5'),
80+
"broker": mqtt_config.get('broker_address', ''),
81+
"port": mqtt_config.get('port', '8883'),
82+
"username": mqtt_config.get('username', ''),
83+
"password": mqtt_config.get('password', ''),
84+
"topic": mqtt_config.get('topic', '#'),
85+
"qos": mqtt_config.get('qos', '0'),
86+
"require_tls": mqtt_config.get('require_tls', 'false'),
87+
"keepalive": mqtt_config.get('keepalive', '60'),
88+
}
89+
)
90+
return run
91+
92+
93+
def mqtt_remote_client(mqtt_server_config):
94+
"""Test MQTT connection with provided configuration"""
95+
connection_result = {
96+
'success': False,
97+
'message': '',
98+
'error': None
99+
}
100+
try:
101+
# Callback function for when the client connects
102+
def on_connect(client, userdata, flags, rc):
103+
if rc == 0:
104+
logger.info("Connected successfully to MQTT Broker!")
105+
else:
106+
logger.error(f"Failed to connect, return code {rc}")
107+
108+
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1, client_id="mqtt_connection_test", clean_session=True)
109+
110+
# Set username and password if provided
111+
if mqtt_server_config.get("username") and mqtt_server_config.get("password"):
112+
client.username_pw_set(
113+
username=mqtt_server_config["username"],
114+
password=mqtt_server_config["password"]
115+
)
116+
117+
# Set TLS if required
118+
if mqtt_server_config.get("require_tls") == 'true':
119+
import ssl
120+
tls_config = {}
121+
if mqtt_server_config.get("ca_certs"):
122+
tls_config['ca_certs'] = mqtt_server_config["ca_certs"]
123+
if mqtt_server_config.get("certfile"):
124+
tls_config['certfile'] = mqtt_server_config["certfile"]
125+
if mqtt_server_config.get("keyfile"):
126+
tls_config['keyfile'] = mqtt_server_config["keyfile"]
127+
128+
# If disable certs verification is checked
129+
if mqtt_server_config.get("tls_disable_certs") == 'true':
130+
tls_config['cert_reqs'] = ssl.CERT_NONE
131+
132+
client.tls_set(**tls_config)
133+
134+
client.on_connect = on_connect
135+
136+
# Attempt connection
137+
port = int(mqtt_server_config.get("port", 8883))
138+
keepalive = int(mqtt_server_config.get("keepalive", 60))
139+
140+
client.connect(mqtt_server_config["host"], port, keepalive)
141+
142+
# Start the loop to process network traffic
143+
client.loop_start()
144+
145+
# Give it a moment to connect
146+
time.sleep(2)
147+
148+
# Check if connected
149+
if client.is_connected():
150+
connection_result['success'] = True
151+
connection_result['message'] = f'Successfully connected to MQTT broker at {mqtt_server_config["host"]}:{port}'
152+
else:
153+
connection_result['message'] = 'Failed to connect to MQTT broker'
154+
connection_result['error'] = f'Failed to connect to {mqtt_server_config["host"]} and {port} and {mqtt_server_config["username"]} are not working'
155+
156+
# Disconnect
157+
client.loop_stop()
158+
client.disconnect()
159+
160+
except Exception as e:
161+
connection_result['success'] = False
162+
connection_result['message'] = 'Connection failed'
163+
connection_result['error'] = str(e)
164+
165+
return connection_result
166+
167+
168+
# Initialize with empty data - will be loaded when user specifies catalog/schema/table and clicks refresh
169+
curr_data = []
170+
171+
172+
def get_mqtt_stats():
173+
"""Get MQTT message statistics from data"""
174+
if not curr_data or len(curr_data) == 0:
175+
return {
176+
'duplicated_messages': 0,
177+
'qos2_messages': 0,
178+
'unique_topics': 0,
179+
'total_messages': 0
180+
}
181+
182+
# Count duplicated messages
183+
duplicated = len([a for a in curr_data if str(a.get('is_duplicate', 'false')).lower() == 'true'])
184+
# Count QoS 2 messages
185+
qos2_messages = len([a for a in curr_data if str(a.get('qos', 0)) == '2'])
186+
# Count unique topics
187+
unique_topics = len(set([str(a.get('topic', '')) for a in curr_data]))
188+
# Total row count
189+
row_count = len(curr_data)
190+
191+
return {
192+
'duplicated_messages': duplicated,
193+
'qos2_messages': qos2_messages,
194+
'unique_topics': unique_topics,
195+
'total_messages': row_count
196+
}
197+
198+
199+
@flask_app.route('/api/test-mqtt-connection', methods=['POST'])
200+
def test_mqtt_connection():
201+
"""API endpoint to test MQTT broker connection"""
202+
try:
203+
# Get the configuration from the request
204+
mqtt_config = request.json
205+
206+
# Validate required fields
207+
if not mqtt_config.get('broker_address'):
208+
return jsonify({
209+
'success': False,
210+
'error': 'Broker address is required'
211+
}), 400
212+
213+
# Prepare config for MQTT test
214+
mqtt_server_config = {
215+
'host': mqtt_config.get('broker_address'),
216+
'port': mqtt_config.get('port', '1883'),
217+
'username': mqtt_config.get('username', ''),
218+
'password': mqtt_config.get('password', ''),
219+
'require_tls': mqtt_config.get('require_tls', 'false'),
220+
'ca_certs': mqtt_config.get('ca_certs', ''),
221+
'certfile': mqtt_config.get('certfile', ''),
222+
'keyfile': mqtt_config.get('keyfile', ''),
223+
'tls_disable_certs': mqtt_config.get('tls_disable_certs', 'false'),
224+
'keepalive': mqtt_config.get('keepalive', '60')
225+
}
226+
# Test the connection
227+
result = mqtt_remote_client(mqtt_server_config)
228+
229+
if result['success']:
230+
return jsonify(result), 200
231+
else:
232+
return jsonify(result), 400
233+
234+
except Exception as e:
235+
return jsonify({
236+
'success': False,
237+
'message': 'Connection test failed',
238+
'error': str(e)
239+
}), 500
240+
241+
242+
@flask_app.route('/api/start-mqtt-job', methods=['POST'])
243+
def start_mqtt_job():
244+
"""API endpoint to start the MQTT data ingestion job with user configuration"""
245+
try:
246+
# Get the configuration from the request
247+
mqtt_config = request.json
248+
249+
# Validate required fields
250+
if not mqtt_config.get('broker_address'):
251+
return jsonify({
252+
'success': False,
253+
'error': 'Broker address is required'
254+
}), 400
255+
256+
if not mqtt_config.get('catalog') or not mqtt_config.get('schema') or not mqtt_config.get('table'):
257+
return jsonify({
258+
'success': False,
259+
'error': 'Catalog, Schema, and Table name are required'
260+
}), 400
261+
262+
# Get notebook_path and cluster_id from config
263+
notebook_path = mqtt_config.get('notebook_path')
264+
cluster_id = mqtt_config.get('cluster_id')
265+
266+
# Create the job
267+
created_job = create_job(client, notebook_path, cluster_id)
268+
269+
# Run the job with user configuration
270+
run = run_job(client, created_job, mqtt_config)
271+
272+
catalog = mqtt_config.get('catalog')
273+
schema = mqtt_config.get('schema')
274+
table = mqtt_config.get('table')
275+
276+
return jsonify({
277+
'success': True,
278+
'job_id': created_job.job_id,
279+
'run_id': run.run_id,
280+
'message': f'MQTT data ingestion job started successfully. Data will be written to {catalog}.{schema}.{table}'
281+
})
282+
283+
except Exception as e:
284+
return jsonify({
285+
'success': False,
286+
'error': str(e)
287+
}), 500
288+
289+
290+
@flask_app.route('/api/refresh-data', methods=['POST'])
291+
def refresh_data():
292+
"""API endpoint to refresh dashboard data from specified table"""
293+
global curr_data
294+
try:
295+
data = request.json
296+
catalog = data.get('catalog')
297+
schema = data.get('schema')
298+
table = data.get('table')
299+
warehouse_id = data.get('warehouse_id', '4b9b953939869799') # Default fallback
300+
301+
# Validate required fields
302+
if not catalog or not schema or not table:
303+
return jsonify({
304+
'success': False,
305+
'error': 'Catalog, Schema, and Table name are required'
306+
}), 400
307+
308+
# Build the query with parameterized values
309+
query = "SELECT message, is_duplicate, qos, topic, received_time FROM %s.%s.%s ORDER BY received_time DESC LIMIT %s"
310+
311+
# Fetch data using get_data function with parameters
312+
curr_data = get_data(query, warehouse_id, (catalog, schema, table, 100))
313+
314+
# Calculate stats from refreshed data
315+
stats = get_mqtt_stats()
316+
317+
return jsonify({
318+
'success': True,
319+
'message': f'Data refreshed from {catalog}.{schema}.{table}',
320+
'row_count': len(curr_data) if curr_data else 0,
321+
'data': curr_data, # Return the actual data to update the UI
322+
'stats': stats # Return the calculated stats
323+
})
324+
except Exception as e:
325+
return jsonify({
326+
'success': False,
327+
'error': str(e)
328+
}), 500
329+
330+
331+
@flask_app.route('/')
332+
def dashboard():
333+
"""Main MQTT Data Monitor dashboard page"""
334+
stats = get_mqtt_stats()
335+
336+
return render_template('dashboard.html', stats=stats, curr_data=curr_data)
337+
338+
def get_alert_icon(alert_type):
339+
if 'IoT Sensor' in alert_type:
340+
return 'fa-thermometer-half'
341+
elif 'Device Status' in alert_type:
342+
return 'fa-wifi'
343+
elif 'System Metrics' in alert_type:
344+
return 'fa-chart-bar'
345+
elif 'Security' in alert_type:
346+
return 'fa-shield-alt'
347+
else:
348+
return 'fa-broadcast-tower'
349+
350+
def get_status_color(status):
351+
if status == 'new':
352+
return 'bg-blue-100 text-blue-800'
353+
elif status == 'in_progress':
354+
return 'bg-yellow-100 text-yellow-800'
355+
elif status == 'resolved':
356+
return 'bg-green-100 text-green-800'
357+
else:
358+
return 'bg-gray-100 text-gray-800'
359+
360+
def get_severity_color(severity):
361+
if severity == 'CRITICAL':
362+
return 'bg-red-100 text-red-800'
363+
elif severity == 'WARNING':
364+
return 'bg-yellow-100 text-yellow-800'
365+
elif severity == 'INFO':
366+
return 'bg-blue-100 text-blue-800'
367+
else:
368+
return 'bg-gray-100 text-gray-800'
369+
370+
if __name__ == '__main__':
371+
logger.info("Starting MQTT Data Monitor Dashboard")
372+
logger.info("MQTT Message Processing & Analytics Platform")
373+
logger.info("=" * 50)
374+
flask_app.run(debug=True, host='0.0.0.0', port=8001)
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
command: [
2+
"flask",
3+
"--app",
4+
"app.py",
5+
"run"
6+
]
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
databricks-sdk>=0.66.0
2+
flask>=2.3.0
3+
pandas>=1.5.0
4+
paho-mqtt>=2.0.0

0 commit comments

Comments
 (0)