-
Notifications
You must be signed in to change notification settings - Fork 35
Tracker service v0.2.0: mqtt support #923
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…ent lambda to keep RobotVision warning‑free under modern C++ standards without changing behavior.
- Remove redundant -G Ninja from cmake --preset commands (generator is already set via conan toolchain) - Consolidate clean-all and clean-local into single clean target - Clean now removes local builds and docker images
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements MQTT support for the tracker service v0.2.0, enabling message-based communication for camera detection ingestion and scene output publication.
Changes:
- Added MQTT client and message handler components with TLS/mTLS support and automatic reconnection
- Implemented comprehensive unit and service tests with schema validation
- Updated configuration system to support MQTT settings and environment variable overrides
Reviewed changes
Copilot reviewed 31 out of 33 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tracker/test/unit/mqtt_client_test.cpp | Unit tests for static MQTT client functions (client ID generation, backoff calculation) |
| tracker/test/unit/message_handler_test.cpp | Unit tests for message handler with mocked MQTT client |
| tracker/test/unit/config_loader_test.cpp | Extended config loader tests for MQTT settings and TLS configuration |
| tracker/test/unit/CMakeLists.txt | Added MQTT-related test sources and Paho MQTT library dependency |
| tracker/test/service/utils/schema.py | JSON schema validation utilities for service tests |
| tracker/test/service/utils/docker.py | Docker helper utilities with duplicated code at end of file |
| tracker/test/service/utils/certs.py | TLS certificate generation for mTLS testing |
| tracker/test/service/test_mqtt.py | Service tests for MQTT connection resilience and message flow |
| tracker/test/service/test_shutdown.py | Minor formatting cleanup for shutdown test |
| tracker/test/service/requirements.txt | Added MQTT and crypto dependencies |
| tracker/test/service/mosquitto.conf | Updated broker config with TLS support |
| tracker/test/service/docker-compose.yaml | New unified compose file replacing test-specific version |
| tracker/test/service/conftest.py | Enhanced fixtures with TLS certificate support |
| tracker/src/mqtt_client.cpp | MQTT client implementation with reconnection and TLS |
| tracker/src/message_handler.cpp | Message handler for camera detections and scene output |
| tracker/src/main.cpp | Integrated MQTT client and message handler into main loop |
| tracker/src/config_loader.cpp | Extended config loader for MQTT and TLS settings |
| tracker/schema/config.schema.json | Updated schema with MQTT TLS configuration |
| tracker/inc/mqtt_client.hpp | MQTT client interface and implementation headers |
| tracker/inc/message_handler.hpp | Message handler interface with schema validation |
| tracker/inc/env_vars.hpp | Added MQTT-related environment variables |
| tracker/inc/config_loader.hpp | Extended config structures for MQTT and TLS |
| tracker/conanfile.txt | Replaced simdjson with paho-mqtt-cpp |
| tracker/README.md | Added mosquitto-clients installation instructions |
| tracker/Makefile | Added MQTT testing targets and coverage exclusions |
| tracker/Dockerfile | Fixed hadolint warning for mkdir/ldd command |
| tracker/CMakeLists.txt | Replaced simdjson with Paho MQTT in build system |
| sample_data/docker-compose-dl-streamer-example.yml | Updated tracker service with MQTT configuration |
| .github/resources/.prettierignore | Excluded CMake build directories |
tdorauintc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review done: I haven't gone through MQTT client implementation and tests yet but when a second pair of eyes looks at those and the current comments are addressed, we can merge.
| // Log parsed message details | ||
| size_t total_detections = 0; | ||
| for (const auto& [category, detections] : message->objects) { | ||
| total_detections += detections.size(); | ||
| } | ||
| LOG_DEBUG("Parsed message: camera={}, timestamp={}, detections={}", message->id, | ||
| message->timestamp, total_detections); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we going to use total_detections e.g. for observability metrics? If not, it could be calculated only if log level is debug (optimization possible).
| LOG_INFO("MessageHandler stopping (received: {}, published: {}, rejected: {})", | ||
| received_count_.load(), published_count_.load(), rejected_count_.load()); | ||
|
|
||
| mqtt_client_->setMessageCallback(nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation only clears the callback but doesn't actually unsubscribe from the MQTT topic. This means that the broker is not informed and still sends messages to the client even though they're being ignored and cleanup is incomplete - if you call start() again, you might have duplicate subscriptions.
For symmetry between start and stop, it would make sense if the stop() method unsubscribed from TOPIC_CAMERA_DATA (we would need to add unsubscribe method to MQTT client interface).
| // Format output topic: scenescape/data/scene/{scene_id}/{thing_type} | ||
| std::ostringstream output_topic; | ||
| output_topic << "scenescape/data/scene/" << DUMMY_SCENE_ID << "/" << DUMMY_THING_TYPE; | ||
|
|
||
| mqtt_client_->publish(output_topic.str(), scene_message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be optimized (std::ostringstream heap allocation - with internal buffer allocation, .str() - extracts the string by copying the buffer contents) by using string concatenation with reserve() or TOPIC_SCENE_DATA_PATTERN and formatter.
| LOG_DEBUG("Published track to: {} ({} bytes)", output_topic.str(), scene_message.size()); | ||
| } | ||
|
|
||
| std::string MessageHandler::extractCameraId(const std::string& topic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small optimization suggestion: we could avoid copy on return by using std::string_view here.
| continue; | ||
| } | ||
|
|
||
| std::vector<Detection> detections; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to use pre-allocation with reserve() to reduce copies.
| "/infrastructure/mqtt/tls/client_cert_path"; | ||
| constexpr char INFRASTRUCTURE_MQTT_TLS_CLIENT_KEY_PATH[] = | ||
| "/infrastructure/mqtt/tls/client_key_path"; | ||
| constexpr char INFRASTRUCTURE_MQTT_TLS_VERIFY_SERVER[] = "/infrastructure/mqtt/tls/verify_server"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: This code is perfectly self-explanatory, I'd remove comments.
Also I wouldn't break lines, we still fit to 120.
| constexpr char INFRASTRUCTURE_MQTT_TLS_VERIFY_SERVER[] = "/infrastructure/mqtt/tls/verify_server"; | |
| constexpr char OBSERVABILITY_LOGGING_LEVEL[] = "/observability/logging/level"; | |
| constexpr char INFRASTRUCTURE_TRACKER_HEALTHCHECK_PORT[] = "/infrastructure/tracker/healthcheck/port"; | |
| constexpr char INFRASTRUCTURE_TRACKER_SCHEMA_VALIDATION[] = "/infrastructure/tracker/schema_validation"; | |
| constexpr char INFRASTRUCTURE_MQTT_HOST[] = "/infrastructure/mqtt/host"; | |
| constexpr char INFRASTRUCTURE_MQTT_PORT[] = "/infrastructure/mqtt/port"; | |
| constexpr char INFRASTRUCTURE_MQTT_INSECURE[] = "/infrastructure/mqtt/insecure"; | |
| constexpr char INFRASTRUCTURE_MQTT_TLS[] = "/infrastructure/mqtt/tls"; | |
| constexpr char INFRASTRUCTURE_MQTT_TLS_CA_CERT_PATH[] = "/infrastructure/mqtt/tls/ca_cert_path"; | |
| constexpr char INFRASTRUCTURE_MQTT_TLS_CLIENT_CERT_PATH[] = "/infrastructure/mqtt/tls/client_cert_path"; | |
| constexpr char INFRASTRUCTURE_MQTT_TLS_CLIENT_KEY_PATH[] = "/infrastructure/mqtt/tls/client_key_path"; | |
| constexpr char INFRASTRUCTURE_MQTT_TLS_VERIFY_SERVER[] = "/infrastructure/mqtt/tls/verify_server"; |
|
|
||
| // Global MQTT client pointer for readiness checks | ||
| std::shared_ptr<tracker::MqttClient> g_mqtt_client; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: this code is self-explanatory, I'd remove this comment and empty line.
| // Global MQTT client pointer for readiness checks | |
| std::shared_ptr<tracker::MqttClient> g_mqtt_client; | |
| std::shared_ptr<tracker::MqttClient> g_mqtt_client; |
|
|
||
| } // namespace | ||
|
|
||
| void clearEmptyProxyEnvVars() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This utility function is never used (except for UT).
| void clearEmptyProxyVars() { | ||
| const char* proxy_vars[] = {"http_proxy", "HTTP_PROXY", "https_proxy", | ||
| "HTTPS_PROXY", "no_proxy", "NO_PROXY"}; | ||
|
|
||
| bool cleared_any = false; | ||
| for (const char* var : proxy_vars) { | ||
| const char* value = std::getenv(var); | ||
| if (value != nullptr && value[0] == '\0') { | ||
| unsetenv(var); | ||
| cleared_any = true; | ||
| } | ||
| } | ||
|
|
||
| if (cleared_any) { | ||
| LOG_INFO("Cleared empty proxy environment variables (Paho MQTT workaround)"); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate with proxy_utils function?
| client_id_(generateClientId()) { | ||
| // Paho MQTT library cannot handle empty proxy environment variables. | ||
| // Clear them if empty, but preserve real proxy URLs for production use. | ||
| clearEmptyProxyVars(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call to utility function instead?
| clearEmptyProxyVars(); | |
| clearEmptyProxyEnvVars(); |
| } | ||
|
|
||
| void MqttClient::subscribe(const std::string& topic) { | ||
| pending_subscriptions_.insert(topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accesing pending_subscriptions_ is not threat-safe, while it should be.
The race: If subscribe() is called while connected() iterates the set, you have concurrent read/write - undefined behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 38 out of 40 changed files in this pull request and generated no new comments.
| int delay_s = initial_ms / 1000; | ||
| for (int i = 0; i < attempt; ++i) { | ||
| delay_s *= 2; | ||
| if (delay_s >= max_delay_s) { | ||
| delay_s = max_delay_s; | ||
| break; | ||
| } | ||
| } | ||
| return std::chrono::milliseconds(delay_s * 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing that conversion to sec is to avoid overflow. The problem with that implementation is that we are loosing precision. Please request explicitly operation on int64_t - it should be safe, without conversion to seconds.
| int delay_s = initial_ms / 1000; | |
| for (int i = 0; i < attempt; ++i) { | |
| delay_s *= 2; | |
| if (delay_s >= max_delay_s) { | |
| delay_s = max_delay_s; | |
| break; | |
| } | |
| } | |
| return std::chrono::milliseconds(delay_s * 1000); | |
| int64_t cap_ms = static_cast<int64_t>(max_delay_s) * 1000; | |
| int64_t delay_ms = static_cast<int64_t>(initial_ms); | |
| for (int i = 0; i < attempt; ++i) { | |
| delay_ms *= 2; | |
| if (delay_ms >= cap_ms) { | |
| delay_ms = cap_ms; | |
| break; | |
| } | |
| } | |
| return std::chrono::milliseconds(delay_ms); |
📝 Description
For description see #923 (review).
Target for the next PR is to introduce proper time-chunking tracking for OOB scenes configured statically via config file.
Testing
Validated by unit tests, service tests and manually for OOB scenes. The tracker subscribes to scenescape/data/camera/+ and on each received message publishes dummy data to scenescape/data/scene/dummy-scene/thing.
Related PRs
✨ Type of Change
Select the type of change your PR introduces:
🧪 Testing Scenarios
Describe how the changes were tested and how reviewers can test them too:
✅ Checklist
Before submitting the PR, ensure the following: