Skip to content

Conversation

@jefferyann-db
Copy link
Contributor

  • Add v1 of the mqtt data monitor app with databricks apps and crendentials
  • app requirements.txt with all required libraries needed to execute
  • add app.yaml with startup command
  • create a dedicated app folder called MQTT Data Monitor

[TODOs]

  • Add parameterized mqtt ingest notebook to codebase
  • add publish job to codebase to allow ease of a pub-sub test
  • stop job functionality to be able to clean job from UI
  • saved connector configs in UI for ease of use.

@dmoore247 dmoore247 self-requested a review October 15, 2025 22:07
@dmoore247 dmoore247 added the enhancement New feature or request label Oct 15, 2025
Copy link
Collaborator

@dmoore247 dmoore247 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good. Identified a few items to address please.

@@ -0,0 +1,6 @@
psycopg2-binary==2.9.9
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see that this is necessary, I could not find Lakebase reference. Please remove if not used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This psycopg2-binary dependency still appears in the requirements.txt file.

self.require_tls = True if str_tls == "true" else False
self.require_tls = options.get("require_tls", True)
self.port = int(options.get("port", 8883))
self.username = options.get("username", "")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add input validation logic.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#High Priority Fixes#
Fix the typo: clean_sesion → clean_session
Implement topic parsing: Complete the _parse_topic() method
Add input validation: Validate broker_address, port ranges, QoS values
Separate frontend code: Extract HTML/CSS/JS to separate files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typos fixed
Refactored app code into seperate files
Added a sample mqtt job to be triggered by app to pull data
Input validations for broker, port, qos and clean_session added

[TODO] -topic_parsing would be in v2, have to be a little clever with how we can distribute the topics to pull data in a parallel manner and take advantage of all the compute.

Copy link
Collaborator

@dmoore247 dmoore247 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good with changes.

# Convert DataFrame to list of dictionaries
return df.to_dict('records')
except Exception as e:
print(f"Database query failed: {e}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the python logger



def create_job(client, notebook_path, cluster_id):
# cluster_id = (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove dead code

# Callback function for when the client connects
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected successfully to MQTT Broker!")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use python logger.

connection_result['error'] = f'Failed to connect to {mqtt_server_config["host"]} and {port} and {mqtt_server_config["username"]} are not working'

# Disconnect
client.loop_stop()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disconnect should be in a finally block in case of exception?
Also, diconnect may additionally generate an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is manually handled by the client. What happens is that since the read is stateless we connect session for a brief period read all data and close out the loop which automatically saves our session ids and give us all data that came in during the time we were disconnected. This is why we create a client session ID, everytime we use the class to trigger a new data stream.

'critical': duplicated,
'in_progress': qos2_messages,
'resolved_today': unique_topics,
'avg_response_time': row_count
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't make sense to return row_count for avg_response_time, does it?

query = f"SELECT message, is_duplicate, qos, topic, received_time FROM {catalog}.{schema}.{table} ORDER BY received_time DESC LIMIT 100"

# Fetch data using get_data function
curr_data = get_data(query, "4b9b953939869799")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this literal? Magic values should be handled differently than dropped in the middle of the code.

}), 400

# Build the query
query = f"SELECT message, is_duplicate, qos, topic, received_time FROM {catalog}.{schema}.{table} ORDER BY received_time DESC LIMIT 100"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F strings based dynamic sql generation pose a risk of sql injection attack. Try parameterized queries.

"""Main MQTT Data Monitor dashboard page"""
stats = get_mqtt_stats()

html = '''
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are better ways of serving HTML in an app, e.g. pulling the html code from a file.

@@ -0,0 +1,6 @@
psycopg2-binary==2.9.9
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This psycopg2-binary dependency still appears in the requirements.txt file.

Copy link
Collaborator

@dmoore247 dmoore247 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dmoore247 dmoore247 merged commit 70940ff into main Oct 31, 2025
1 of 6 checks passed
@dmoore247 dmoore247 deleted the feat/mqtt_app branch October 31, 2025 15:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants