Skip to content
Draft
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
45bbe9d
Initial commit
dmytroye Dec 8, 2025
56e31ed
Add debug logging
dmytroye Dec 8, 2025
77b39b6
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 9, 2025
71381ca
Phyton indent fix
dmytroye Dec 9, 2025
1211d39
Phyton indent fix
dmytroye Dec 9, 2025
070cfbd
Phyton indent fix
dmytroye Dec 9, 2025
36c81e4
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 9, 2025
57c1b6f
Fix SyntaxError
dmytroye Dec 9, 2025
3674cb9
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 10, 2025
22252fe
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 10, 2025
98bfa61
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 10, 2025
4f7333a
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 11, 2025
66570c7
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 11, 2025
1e97534
Apply suggestions from code review
dmytroye Dec 11, 2025
41eaab7
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 11, 2025
8b0abe1
Merge remote-tracking branch 'origin/main' into dmytroye/subscribe-an…
dmytroye Dec 15, 2025
6f570f9
Merge branch 'dmytroye/subscribe-analytics-to-mqtt' of https://github…
dmytroye Dec 15, 2025
6c93325
Add flag to disable tracker in controller
dmytroye Dec 15, 2025
93196d9
Add flag to demo example docker-compose
dmytroye Dec 15, 2025
45d6b91
Indent fix
dmytroye Dec 15, 2025
ef542a9
Apply suggestions from code review
dmytroye Dec 15, 2025
a82f36a
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 15, 2025
8e19ac7
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 16, 2025
e4d9375
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 17, 2025
c146c78
Dissable tracker flag
dmytroye Dec 17, 2025
38a9bbb
If tracker is enabled, use direct tracker call
dmytroye Dec 17, 2025
5922c7c
Phyton ident
dmytroye Dec 17, 2025
07157cd
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 17, 2025
05b101f
Fix errors on MQTT messeges
dmytroye Dec 17, 2025
45b6dc2
Remove logs and debug changes
dmytroye Dec 17, 2025
4359918
Whitespaces
dmytroye Dec 17, 2025
317fffe
Get frame rate from mqtt
dmytroye Dec 17, 2025
4d131d6
Don't subscribe to camera and sensor topics when --tracker-disabled
dmytroye Dec 18, 2025
b1086af
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 18, 2025
01e4e6e
Subscribe to scene/data only when tracke disabled
dmytroye Dec 18, 2025
6a8b258
Delete whitespace
dmytroye Dec 18, 2025
77b5b44
Disable publishing to data/scene when tracke is dissabled
dmytroye Dec 18, 2025
4c0e0fc
Add logging
dmytroye Dec 18, 2025
e6302e3
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 19, 2025
5b69ee2
WIP on mqtt messeges
dmytroye Dec 19, 2025
106a768
WIP debug
dmytroye Dec 19, 2025
3e13572
remove field
dmytroye Dec 19, 2025
5f10e07
WIP
dmytroye Dec 19, 2025
25fc819
Remove whitespace
dmytroye Dec 19, 2025
e72ca28
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 22, 2025
231fc75
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 22, 2025
c1e3788
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 29, 2025
4cba62e
Change order
dmytroye Dec 29, 2025
73bc1d9
WIP
dmytroye Dec 29, 2025
98ac73a
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Dec 30, 2025
2fd07c5
Fix analytics mode: preserve camera_bounds and ref_camera_frame_rate …
dmytroye Dec 30, 2025
2bab1a7
Remove _normalize_camera_bounds
dmytroye Dec 30, 2025
e650bd3
CleanUp
dmytroye Dec 31, 2025
6478510
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Jan 12, 2026
fdf10fd
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Jan 12, 2026
b94a6f7
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Jan 12, 2026
9263330
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Jan 15, 2026
d387d10
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Jan 15, 2026
d6fe93c
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
dmytroye Jan 15, 2026
8ba7785
Merge branch 'main' into dmytroye/subscribe-analytics-to-mqtt
scenescapecicd Jan 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions controller/src/controller/detections_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ def prepareObjDict(scene, obj, update_visibility):
'velocity': velocity.asCartesianVector
})

# Add frame count for analytics
if hasattr(aobj, 'frameCount'):
obj_dict['frame_count'] = aobj.frameCount

rotation = aobj.rotation
if rotation is not None:
obj_dict['rotation'] = rotation
Expand Down
82 changes: 81 additions & 1 deletion controller/src/controller/scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0

import itertools
from types import SimpleNamespace
from typing import Optional

import numpy as np
Expand Down Expand Up @@ -58,6 +59,9 @@ def __init__(self, name, map_file, scale=None,
self._trs_xyz_to_lla = None
self.use_tracker = True

# Cache for tracked objects from MQTT (for analytics)
self.tracked_objects_cache = {}

# FIXME - only for backwards compatibility
self.scale = scale

Expand Down Expand Up @@ -294,10 +298,86 @@ def processSensorData(self, jdata, when):

return True

def updateTrackedObjects(self, detection_type, objects):
"""
Update the cache of tracked objects from MQTT.
This is used by Analytics to consume tracked objects published by the Tracker service.

Args:
detection_type: The type of detection (e.g., 'person', 'vehicle')
objects: List of tracked objects for this detection type
"""
self.tracked_objects_cache[detection_type] = objects
return

def getTrackedObjects(self, detection_type):
"""
Get tracked objects from cache (MQTT) or fallback to direct tracker call.

Args:
detection_type: The type of detection

Returns:
List of tracked objects (MovingObject instances or serialized dicts)
"""
# First try to get from cache (MQTT-based)
if detection_type in self.tracked_objects_cache:
cached_objects = self.tracked_objects_cache[detection_type]
if cached_objects and isinstance(cached_objects[0], dict):
return self._deserializeTrackedObjects(cached_objects)
log.debug("Using cached tracked objects for detection type:", detection_type)
return cached_objects

# Fallback to direct tracker call (for backward compatibility)
if self.tracker is not None:
log.debug("Using direct tracker call for detection type:", detection_type)
return self.tracker.currentObjects(detection_type)

return []

def _deserializeTrackedObjects(self, serialized_objects):
"""
Convert serialized tracked objects to a format usable by Analytics.
This creates lightweight wrappers that mimic MovingObject interface.

Args:
serialized_objects: List of serialized object dictionaries

Returns:
List of object-like structures with necessary attributes
"""
from types import SimpleNamespace

objects = []
for obj_data in serialized_objects:
# Create a simple object that has the necessary attributes
obj = SimpleNamespace()
obj.gid = obj_data.get('id')
obj.category = obj_data.get('type')
obj.sceneLoc = Point(obj_data.get('translation', [0, 0, 0]))
obj.velocity = Point(obj_data.get('velocity', [0, 0, 0])) if obj_data.get('velocity') else None
obj.size = obj_data.get('size')
obj.confidence = obj_data.get('confidence')
obj.frameCount = obj_data.get('frame_count', 0)
obj.when = get_epoch_time(obj_data.get('first_seen')) if 'first_seen' in obj_data else get_epoch_time()
obj.visibility = obj_data.get('visibility', [])

# Chain data for regions, sensors, and published locations
obj.chain_data = SimpleNamespace()
obj.chain_data.regions = obj_data.get('regions', {})
obj.chain_data.sensors = obj_data.get('sensors', {})
obj.chain_data.persist = obj_data.get('persistent_data', {})
# Initialize publishedLocations - will be populated by _updateEvents
obj.chain_data.publishedLocations = [obj.sceneLoc]

objects.append(obj)

return objects

def _updateEvents(self, detectionType, now):
self.events = {}
now_str = get_iso_time(now)
curObjects = self.tracker.currentObjects(detectionType)
curObjects = self.getTrackedObjects(detectionType)
for obj in curObjects:
obj.chain_data.publishedLocations.insert(0, obj.sceneLoc)

Expand Down
32 changes: 32 additions & 0 deletions controller/src/controller/scene_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,31 @@ def handleMovingObjectMessage(self, client, userdata, message):
self.publishEvents(scene, jdata['timestamp'])
return

def handleSceneDataMessage(self, client, userdata, message):
"""
Handle scene data messages (tracked objects) published to DATA_SCENE topic.
This updates the Analytics cache with tracked objects from the existing topic.
"""
topic = PubSub.parseTopic(message.topic)
jdata = orjson.loads(message.payload.decode('utf-8'))

scene_id = topic['scene_id']
detection_type = topic['thing_type']

scene = self.cache_manager.sceneWithID(scene_id)
if scene is None:
log.debug("Scene not found for tracked objects, ignoring", scene_id)
return

# Extract tracked objects from the existing DATA_SCENE message
tracked_objects = jdata.get('objects', [])

# Update the analytics cache with tracked objects
scene.updateTrackedObjects(detection_type, tracked_objects)

log.debug(f"Updated tracked objects cache for scene {scene.name}, type {detection_type}, count {len(tracked_objects)}")
return

def _handleChildSceneObject(self, sender_id, jdata, detection_type, msg_when):
sender = self.cache_manager.sceneWithID(sender_id)
if sender is None:
Expand Down Expand Up @@ -593,6 +618,13 @@ def updateSubscriptions(self):
for sensor in scene.sensors:
need_subscribe.add((PubSub.formatTopic(PubSub.DATA_SENSOR, sensor_id=sensor),
self.handleSensorMessage))

# Subscribe to scene data (tracked objects) for Analytics to consume
# This reuses the existing DATA_SCENE topic that tracker already publishes to
need_subscribe.add((PubSub.formatTopic(PubSub.DATA_SCENE,
scene_id=scene.uid, thing_type="+"),
self.handleSceneDataMessage))

if hasattr(scene, 'children'):
child_scenes = self.cache_manager.data_source.getChildScenes(scene.uid)

Expand Down
Loading