99import os
1010import json
1111import time
12+ import re
1213import paho .mqtt .client as mqtt_client
1314
15+ # Configure logging
16+ logging .basicConfig (
17+ level = logging .INFO ,
18+ format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
19+ )
1420
15- log = logging .getLogger ('werkzeug' )
16- log .setLevel (logging .ERROR )
21+ # Create logger for this application
22+ logger = logging .getLogger (__name__ )
23+
24+ # Suppress werkzeug logging to reduce noise
25+ werkzeug_log = logging .getLogger ('werkzeug' )
26+ werkzeug_log .setLevel (logging .ERROR )
1727
1828flask_app = Flask (__name__ )
1929
2030
2131client = WorkspaceClient ()
2232
2333# Database connection function
24- def get_data (query , warehouse_id ):
34+ def get_data (query , warehouse_id , params = None ):
2535 """Execute query with fallback to demo data"""
2636 try :
2737 cfg = Config ()
@@ -31,19 +41,19 @@ def get_data(query, warehouse_id):
3141 http_path = f"/sql/1.0/warehouses/{ warehouse_id } " ,
3242 credentials_provider = lambda : cfg .authenticate
3343 ) as connection :
34- df = pandas .read_sql (query , connection )
44+ if params :
45+ df = pandas .read_sql (query , connection , params = params )
46+ else :
47+ df = pandas .read_sql (query , connection )
3548 # Convert DataFrame to list of dictionaries
3649 return df .to_dict ('records' )
3750 except Exception as e :
38- print (f"Database query failed: { e } " )
51+ logger . error (f"Database query failed: { e } " )
3952 # Return empty list on error so we can show a message
4053 return []
4154
4255
4356def create_job (client , notebook_path , cluster_id ):
44- # cluster_id = (
45- # w.clusters.ensure_cluster_is_running(os.environ["DATABRICKS_CLUSTER_ID"]) and os.environ["DATABRICKS_CLUSTER_ID"]
46- # )
4757
4858 created_job = client .jobs .create (
4959 name = f"mqtt_{ time .time_ns ()} " ,
@@ -91,9 +101,9 @@ def mqtt_remote_client(mqtt_server_config):
91101 # Callback function for when the client connects
92102 def on_connect (client , userdata , flags , rc ):
93103 if rc == 0 :
94- print ("Connected successfully to MQTT Broker!" )
104+ logger . info ("Connected successfully to MQTT Broker!" )
95105 else :
96- print (f"Failed to connect, return code { rc } \n " )
106+ logger . error (f"Failed to connect, return code { rc } " )
97107
98108 client = mqtt_client .Client (mqtt_client .CallbackAPIVersion .VERSION1 , client_id = "mqtt_connection_test" , clean_session = True )
99109
@@ -163,10 +173,10 @@ def get_mqtt_stats():
163173 """Get MQTT message statistics from data"""
164174 if not curr_data or len (curr_data ) == 0 :
165175 return {
166- 'critical ' : 0 ,
167- 'in_progress ' : 0 ,
168- 'resolved_today ' : 0 ,
169- 'avg_response_time ' : 0
176+ 'duplicated_messages ' : 0 ,
177+ 'qos2_messages ' : 0 ,
178+ 'unique_topics ' : 0 ,
179+ 'total_messages ' : 0
170180 }
171181
172182 # Count duplicated messages
@@ -179,10 +189,10 @@ def get_mqtt_stats():
179189 row_count = len (curr_data )
180190
181191 return {
182- 'critical ' : duplicated ,
183- 'in_progress ' : qos2_messages ,
184- 'resolved_today ' : unique_topics ,
185- 'avg_response_time ' : row_count
192+ 'duplicated_messages ' : duplicated ,
193+ 'qos2_messages ' : qos2_messages ,
194+ 'unique_topics ' : unique_topics ,
195+ 'total_messages ' : row_count
186196 }
187197
188198
@@ -250,8 +260,8 @@ def start_mqtt_job():
250260 }), 400
251261
252262 # Get notebook_path and cluster_id from config
253- notebook_path = mqtt_config .
get (
'notebook_path' , '/Workspace/Users/[email protected] /mqtt/MQTT_data_source_v0' )
254- cluster_id = mqtt_config .get ('cluster_id' , '0709-132523-cnhxf2p6' )
263+ notebook_path = mqtt_config .get ('notebook_path' )
264+ cluster_id = mqtt_config .get ('cluster_id' )
255265
256266 # Create the job
257267 created_job = create_job (client , notebook_path , cluster_id )
@@ -286,7 +296,7 @@ def refresh_data():
286296 catalog = data .get ('catalog' )
287297 schema = data .get ('schema' )
288298 table = data .get ('table' )
289- # warehouse_id = data.get('warehouse_id', DEFAULT_WAREHOUSE_ID)
299+ warehouse_id = data .get ('warehouse_id' , '4b9b953939869799' ) # Default fallback
290300
291301 # Validate required fields
292302 if not catalog or not schema or not table :
@@ -295,11 +305,11 @@ def refresh_data():
295305 'error' : 'Catalog, Schema, and Table name are required'
296306 }), 400
297307
298- # Build the query
299- query = f "SELECT message, is_duplicate, qos, topic, received_time FROM { catalog } . { schema } . { table } ORDER BY received_time DESC LIMIT 100 "
308+ # Build the query with parameterized values
309+ query = "SELECT message, is_duplicate, qos, topic, received_time FROM %s.%s.%s ORDER BY received_time DESC LIMIT %s "
300310
301- # Fetch data using get_data function
302- curr_data = get_data (query , "4b9b953939869799" )
311+ # Fetch data using get_data function with parameters
312+ curr_data = get_data (query , warehouse_id , ( catalog , schema , table , 100 ) )
303313
304314 # Calculate stats from refreshed data
305315 stats = get_mqtt_stats ()
@@ -358,7 +368,7 @@ def get_severity_color(severity):
358368 return 'bg-gray-100 text-gray-800'
359369
360370if __name__ == '__main__' :
361- print ( "🚀 Starting MQTT Data Monitor Dashboard" )
362- print ( "📡 MQTT Message Processing & Analytics Platform" )
363- print ("=" * 50 )
371+ logger . info ( " Starting MQTT Data Monitor Dashboard" )
372+ logger . info ( " MQTT Message Processing & Analytics Platform" )
373+ logger . info ("=" * 50 )
364374 flask_app .run (debug = True , host = '0.0.0.0' , port = 8001 )
0 commit comments