diff --git a/controller/src/controller-cmd b/controller/src/controller-cmd index f5b166840..025764cb7 100755 --- a/controller/src/controller-cmd +++ b/controller/src/controller-cmd @@ -43,6 +43,8 @@ def build_argparser(): parser.add_argument("--visibility_topic", help="Which topic to publish visibility on." "Valid options are 'unregulated', 'regulated', or 'none'", default="regulated") + parser.add_argument("--disable-tracker", action="store_true", + help="Disable tracker functionality. Use this when running a separate Tracker service that produces data to MQTT.") return parser def main(): @@ -54,7 +56,7 @@ def main(): args.brokerauth, args.resturl, args.restauth, args.cert, args.rootcert, args.ntp, args.tracker_config_file, args.schema_file, - args.visibility_topic, args.data_source) + args.visibility_topic, args.data_source, args.disable_tracker) controller.loopForever() return diff --git a/controller/src/controller/cache_manager.py b/controller/src/controller/cache_manager.py index 50ad41433..4a8fb8f7f 100644 --- a/controller/src/controller/cache_manager.py +++ b/controller/src/controller/cache_manager.py @@ -11,10 +11,11 @@ class CacheManager: def __init__(self, data_source=None, rest_url=None, rest_auth=None, - root_cert=None, tracker_config_data={}): + root_cert=None, tracker_config_data={}, disable_tracker=False): self.cached_child_transforms_by_uid = {} self.camera_parameters = {} self.tracker_config_data = tracker_config_data + self.disable_tracker = disable_tracker self.cached_scenes_by_uid = {} self._cached_scenes_by_cameraID = {} self._cached_scenes_by_sensorID = {} @@ -59,7 +60,7 @@ def refreshScenes(self): uid = scene_data['uid'] if uid not in self.cached_scenes_by_uid: - scene = Scene.deserialize(scene_data) + scene = Scene.deserialize(scene_data, self.disable_tracker) else: scene = self.cached_scenes_by_uid[uid] scene.updateScene(scene_data) diff --git a/controller/src/controller/detections_builder.py b/controller/src/controller/detections_builder.py index ea276e7f4..585b6c1c4 100644 --- a/controller/src/controller/detections_builder.py +++ b/controller/src/controller/detections_builder.py @@ -65,8 +65,18 @@ def prepareObjDict(scene, obj, update_visibility): if hasattr(aobj, 'visibility'): obj_dict['visibility'] = aobj.visibility - if update_visibility: + + if hasattr(aobj, 'similarity'): + obj_dict['similarity'] = aobj.similarity + if hasattr(aobj, 'first_seen'): + obj_dict['first_seen'] = get_iso_time(aobj.first_seen) + + if hasattr(aobj, 'visibility'): + if update_visibility and not scene.disable_tracker: computeCameraBounds(scene, aobj, obj_dict) + elif scene.disable_tracker and hasattr(aobj, '_camera_bounds'): + if aobj._camera_bounds: + obj_dict['camera_bounds'] = aobj._camera_bounds chain_data = aobj.chain_data if len(chain_data.regions): @@ -79,6 +89,11 @@ def prepareObjDict(scene, obj, update_visibility): obj_dict['similarity'] = aobj.similarity if hasattr(aobj, 'first_seen'): obj_dict['first_seen'] = get_iso_time(aobj.first_seen) + + if not update_visibility and hasattr(aobj, '_camera_bounds') and aobj._camera_bounds: + if 'camera_bounds' not in obj_dict: + obj_dict['camera_bounds'] = aobj._camera_bounds + if isinstance(obj, TripwireEvent): obj_dict['direction'] = obj.direction if hasattr(aobj, 'asset_scale'): @@ -88,10 +103,12 @@ def prepareObjDict(scene, obj, update_visibility): return obj_dict def computeCameraBounds(scene, aobj, obj_dict): - camera_bounds = {} + camera_bounds = obj_dict.get('camera_bounds', {}) or {} + if 'visibility' not in obj_dict: + return for cameraID in obj_dict['visibility']: bounds = None - if aobj and hasattr(aobj.vectors[0].camera, 'cameraID') \ + if aobj and len(aobj.vectors) > 0 and hasattr(aobj.vectors[0].camera, 'cameraID') \ and cameraID == aobj.vectors[0].camera.cameraID: bounds = getattr(aobj, 'boundingBoxPixels', None) elif scene: diff --git a/controller/src/controller/scene.py b/controller/src/controller/scene.py index 59831d654..13bf43797 100644 --- a/controller/src/controller/scene.py +++ b/controller/src/controller/scene.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import itertools +from types import SimpleNamespace from typing import Optional import numpy as np @@ -44,9 +45,12 @@ def __init__(self, name, map_file, scale=None, non_measurement_time_static = NON_MEASUREMENT_TIME_STATIC, effective_object_update_rate = EFFECTIVE_OBJECT_UPDATE_RATE, time_chunking_enabled = False, - time_chunking_rate_fps = DEFAULT_CHUNKING_RATE_FPS): + time_chunking_rate_fps = DEFAULT_CHUNKING_RATE_FPS, + disable_tracker = False): + log.info("NEW SCENE", name, map_file, scale, max_unreliable_time, - non_measurement_time_dynamic, non_measurement_time_static) + non_measurement_time_dynamic, non_measurement_time_static, + "tracker_disabled=" + str(disable_tracker)) super().__init__(name, map_file, scale) self.ref_camera_frame_rate = time_chunking_rate_fps if time_chunking_enabled else effective_object_update_rate self.max_unreliable_time = max_unreliable_time @@ -56,9 +60,18 @@ def __init__(self, name, map_file, scale=None, self.trackerType = None self.persist_attributes = {} self.time_chunking_rate_fps = time_chunking_rate_fps - self._setTracker("time_chunked_intel_labs" if time_chunking_enabled else self.DEFAULT_TRACKER) + self.disable_tracker = disable_tracker + + if not disable_tracker: + self._setTracker("time_chunked_intel_labs" if time_chunking_enabled else self.DEFAULT_TRACKER) + else: + log.info("Tracker initialization SKIPPED for scene: " + name) + self._trs_xyz_to_lla = None - self.use_tracker = True + self.use_tracker = not disable_tracker + + # Cache for tracked objects from MQTT (for analytics) + self.tracked_objects_cache = {} # FIXME - only for backwards compatibility self.scale = scale @@ -137,6 +150,12 @@ def _createMovingObjectsForDetection(self, detectionType, detections, when, came return objects def processCameraData(self, jdata, when=None, ignoreTimeFlag=False): + + # Skip processing if tracker is disabled - data should come from separate Tracker service via MQTT + if self.disable_tracker: + log.debug(f"Tracker disabled, skipping camera data processing for camera {camera_id}") + return True + camera_id = jdata['id'] camera = None @@ -155,6 +174,7 @@ def processCameraData(self, jdata, when=None, ignoreTimeFlag=False): if not hasattr(camera, 'pose'): log.info("DISCARDING: camera has no pose") return True + for detection_type, detections in jdata['objects'].items(): if "intrinsics" not in jdata: self._convertPixelBoundingBoxesToMeters(detections, camera.pose.intrinsics.intrinsics, camera.pose.intrinsics.distortion) @@ -213,6 +233,15 @@ def processSceneData(self, jdata, child, cameraPose, detectionType, when=None): new = jdata['objects'] + # Update ref_camera_frame_rate before early return (needed for analytics mode) + if 'frame_rate' in jdata: + self.ref_camera_frame_rate = min(jdata['frame_rate'], self.ref_camera_frame_rate) if self.ref_camera_frame_rate is not None else jdata["frame_rate"] + + # Skip processing if tracker is disabled + if self.disable_tracker: + log.debug(f"Tracker disabled, skipping scene data processing for child {child.name if hasattr(child, 'name') else 'unknown'}") + return True + objects = [] child_objects = [] for info in new: @@ -243,12 +272,13 @@ def processSceneData(self, jdata, child, cameraPose, def _finishProcessing(self, detectionType, when, objects, already_tracked_objects=[]): self._updateVisible(objects) - self.tracker.trackObjects(objects, already_tracked_objects, when, [detectionType], - self.ref_camera_frame_rate, - self.max_unreliable_time, - self.non_measurement_time_dynamic, - self.non_measurement_time_static, - self.use_tracker) + if not self.disable_tracker: + self.tracker.trackObjects(objects, already_tracked_objects, when, [detectionType], + self.ref_camera_frame_rate, + self.max_unreliable_time, + self.non_measurement_time_dynamic, + self.non_measurement_time_static, + self.use_tracker) self._updateEvents(detectionType, when) return @@ -293,10 +323,118 @@ def processSensorData(self, jdata, when): return True + def updateTrackedObjects(self, detection_type, objects): + """ + Update the cache of tracked objects from MQTT. + This is used by Analytics to consume tracked objects published by the Tracker service. + + Args: + detection_type: The type of detection (e.g., 'person', 'vehicle') + objects: List of tracked objects for this detection type + """ + self.tracked_objects_cache[detection_type] = objects + return + + def getTrackedObjects(self, detection_type): + """ + Get tracked objects from cache (MQTT) or direct tracker call. + + Args: + detection_type: The type of detection + + Returns: + List of tracked objects (MovingObject instances or serialized dicts) + """ + # If tracker is disabled, only use MQTT cache (from separate Tracker service) + if self.disable_tracker: + if detection_type in self.tracked_objects_cache: + cached_objects = self.tracked_objects_cache[detection_type] + if isinstance(cached_objects, list) and len(cached_objects) > 0 and isinstance(cached_objects[0], dict): + return self._deserializeTrackedObjects(cached_objects) + else: + log.debug("Using cached tracked objects from MQTT for detection type:", detection_type) + return cached_objects + return [] + + # If tracker is enabled, use direct tracker call (traditional mode) + if self.tracker is not None: + log.debug("Using direct tracker call for detection type:", detection_type) + return self.tracker.currentObjects(detection_type) + + return [] + + def _deserializeTrackedObjects(self, serialized_objects): + """ + Convert serialized tracked objects to a format usable by Analytics. + This creates lightweight wrappers that mimic MovingObject interface. + + Args: + serialized_objects: List of serialized object dictionaries + + Returns: + List of object-like structures with necessary attributes + """ + + objects = [] + for obj_data in serialized_objects: + # Create a simple object that has the necessary attributes + obj = SimpleNamespace() + obj.gid = obj_data.get('id') + obj.category = obj_data.get('type', obj_data.get('category')) + obj.sceneLoc = Point(obj_data.get('translation', [0, 0, 0])) + obj.velocity = Point(obj_data.get('velocity', [0, 0, 0])) if obj_data.get('velocity') else None + obj.size = obj_data.get('size') + obj.confidence = obj_data.get('confidence') + obj.frameCount = obj_data.get('frame_count', 0) + obj.rotation = obj_data.get('rotation') + obj.reidVector = obj_data.get('reid') + obj.similarity = obj_data.get('similarity') + obj.vectors = [] # Empty list - tracked objects from MQTT don't have detection vectors + obj.boundingBoxPixels = None # Will use camera_bounds from obj_data if available + + if 'first_seen' in obj_data: + obj.when = get_epoch_time(obj_data.get('first_seen')) + obj.first_seen = obj.when + else: + obj.when = None + obj.first_seen = None + log.warning(f"Missing 'first_seen' for object id {obj_data.get('id')}; setting obj.when to None.") + obj.visibility = obj_data.get('visibility', []) + + # Create info dict with original object data (needed by prepareObjDict) + obj.info = { + 'category': obj.category, + 'confidence': obj.confidence, + } + + # Add center_of_mass if available + if 'center_of_mass' in obj_data: + obj.info['center_of_mass'] = obj_data['center_of_mass'] + + if 'camera_bounds' in obj_data and obj_data['camera_bounds']: + obj._camera_bounds = obj_data['camera_bounds'] + else: + obj._camera_bounds = None + + # Chain data for regions, sensors, and published locations + obj.chain_data = SimpleNamespace() + obj.chain_data.regions = obj_data.get('regions', {}) + obj.chain_data.sensors = obj_data.get('sensors', {}) + obj.chain_data.persist = obj_data.get('persistent_data', {}) + # Initialize publishedLocations - will be populated by _updateEvents + obj.chain_data.publishedLocations = [obj.sceneLoc] + + objects.append(obj) + + return objects + def _updateEvents(self, detectionType, now): self.events = {} now_str = get_iso_time(now) - curObjects = self.tracker.currentObjects(detectionType) + if self.disable_tracker: + curObjects = self.getTrackedObjects(detectionType) + else: + curObjects = self.tracker.currentObjects(detectionType) if self.tracker else [] for obj in curObjects: obj.chain_data.publishedLocations.insert(0, obj.sceneLoc) @@ -425,20 +563,27 @@ def _updateVisible(self, curObjects): return @classmethod - def deserialize(cls, data): + def deserialize(cls, data, disable_tracker=False): tracker_config = data.get('tracker_config', []) - scene = cls(data['name'], data.get('map', None), data.get('scale', None), - *tracker_config) + scale_from_data = data.get('scale', None) + if scale_from_data is None and disable_tracker: + log.warning(f"Scene '{data.get('name')}': scale is None when deserializing in disable_tracker mode. Ensure scale is configured in the database or scene JSON file.") + scene = cls(data['name'], data.get('map', None), scale_from_data, + *tracker_config, disable_tracker=disable_tracker) scene.uid = data['uid'] scene.mesh_translation = data.get('mesh_translation', None) scene.mesh_rotation = data.get('mesh_rotation', None) - scene.use_tracker = data.get('use_tracker', True) + scene.use_tracker = data.get('use_tracker', True) and not disable_tracker + scene.disable_tracker = disable_tracker scene.output_lla = data.get('output_lla', None) scene.map_corners_lla = data.get('map_corners_lla', None) scene.retrack = data.get('retrack', True) scene.regulated_rate = data.get('regulated_rate', None) scene.external_update_rate = data.get('external_update_rate', None) scene.persist_attributes = data.get('persist_attributes', {}) + # Ensure scale is set from data even if __init__ didn't handle it correctly + if 'scale' in data: + scene.scale = data['scale'] if 'cameras' in data: scene.updateCameras(data['cameras']) if 'regions' in data: diff --git a/controller/src/controller/scene_controller.py b/controller/src/controller/scene_controller.py index ee0d0d4da..add4d746c 100644 --- a/controller/src/controller/scene_controller.py +++ b/controller/src/controller/scene_controller.py @@ -30,7 +30,7 @@ class SceneController: def __init__(self, rewrite_bad_time, rewrite_all_time, max_lag, mqtt_broker, mqtt_auth, rest_url, rest_auth, client_cert, root_cert, ntp_server, - tracker_config_file, schema_file, visibility_topic, data_source): + tracker_config_file, schema_file, visibility_topic, data_source, disable_tracker=False): self.cert = client_cert self.root_cert = root_cert self.rewrite_bad_time = rewrite_bad_time @@ -41,9 +41,14 @@ def __init__(self, rewrite_bad_time, rewrite_all_time, max_lag, mqtt_broker, self.mqtt_auth = mqtt_auth self.tracker_config_data = {} self.tracker_config_file = tracker_config_file + self.disable_tracker = disable_tracker + + if disable_tracker: + log.info("Tracker is DISABLED. Controller will run without tracker functionality.") + pass + if tracker_config_file is not None: self.extractTrackerConfigData(tracker_config_file) - self.last_time_sync = None self.ntp_server = ntp_server self.ntp_client = ntplib.NTPClient() @@ -55,7 +60,7 @@ def __init__(self, rewrite_bad_time, rewrite_all_time, max_lag, mqtt_broker, self.pubsub.onConnect = self.onConnect self.pubsub.connect() - self.cache_manager = CacheManager(data_source, rest_url, rest_auth, root_cert, self.tracker_config_data) + self.cache_manager = CacheManager(data_source, rest_url, rest_auth, root_cert, self.tracker_config_data, self.disable_tracker) self.visibility_topic = visibility_topic log.info(f"Publishing camera visibility info on {self.visibility_topic} topic.") @@ -131,7 +136,10 @@ def publishDetections(self, scene, objects, ts, otype, jdata, camera_id): "scene": scene.name } metrics.record_object_count(len(objects), metric_attributes) - self.publishSceneDetections(scene, objects, otype, jdata) + # Only publish to DATA_SCENE topic when tracker is enabled + # When tracker is disabled, we consume from DATA_SCENE instead of publishing to it + if not self.disable_tracker: + self.publishSceneDetections(scene, objects, otype, jdata) self.publishRegulatedDetections(scene, objects, otype, jdata, camera_id) self.publishRegionDetections(scene, objects, otype, jdata) return @@ -175,9 +183,24 @@ def publishRegulatedDetections(self, scene_obj, msg_objects, otype, jdata, camer 'last': None } scene = self.regulate_cache[scene_uid] - scene['objects'][otype] = jdata['objects'] + + # Build the objects list from msg_objects (handles both tracker enabled and disabled) + scene['objects'][otype] = buildDetectionsList(msg_objects, scene_obj, self.visibility_topic == 'unregulated') + + # Store the incoming rate from MQTT message or camera if camera_id is not None: scene['rate'][camera_id] = jdata.get('rate', None) + elif self.disable_tracker and 'rate' in jdata: + # When tracker is disabled, distribute the scene rate to all visible cameras + # Extract unique camera IDs from all objects' visibility lists + camera_ids = set() + for obj in jdata.get('objects', []): + camera_ids.update(obj.get('visibility', [])) + + # Store the same rate for each camera that has visibility + scene_rate = jdata['rate'] + for cam_id in camera_ids: + scene['rate'][cam_id] = scene_rate now = get_epoch_time() if self.shouldPublish(scene['last'], now, 1/scene_obj.regulated_rate): @@ -198,6 +221,7 @@ def publishRegulatedDetections(self, scene_obj, msg_objects, otype, jdata, camer if aobj is not None: computeCameraBounds(scene_obj, aobj, obj) objects.append(obj) + log.debug(f"Publishing regulated: scene={scene_uid}, objects_count={len(objects)}, types={list(scene['objects'].keys())}") new_jdata = { 'timestamp': jdata['timestamp'], 'objects': objects, @@ -359,10 +383,13 @@ def handleSensorMessage(self, client, userdata, message): return def handleMovingObjectMessage(self, client, userdata, message): + # When tracker is disabled, we don't process camera messages + if self.disable_tracker: + return + topic = PubSub.parseTopic(message.topic) jdata = orjson.loads(message.payload.decode('utf-8')) - metric_attributes = { "topic": message.topic, "camera": jdata.get("id", "unknown"), @@ -435,6 +462,44 @@ def handleMovingObjectMessage(self, client, userdata, message): self.publishEvents(scene, jdata['timestamp']) return + def handleSceneDataMessage(self, client, userdata, message): + """ + Handle scene data messages (tracked objects) published to DATA_SCENE topic. + This updates the Analytics cache with tracked objects from the existing topic. + When tracker is disabled, this also publishes analytics results. + """ + topic = PubSub.parseTopic(message.topic) + jdata = orjson.loads(message.payload.decode('utf-8')) + + scene_id = topic['scene_id'] + detection_type = topic['thing_type'] + log.debug(f"Received scene data message: scene={scene_id}, type={detection_type}, objects={len(jdata.get('objects', []))}") + + scene = self.cache_manager.sceneWithID(scene_id) + if scene is None: + log.warning(f"Scene not found for tracked objects, ignoring scene_id={scene_id}") + return + + # Extract tracked objects from the existing DATA_SCENE message + tracked_objects = jdata.get('objects', []) + + # Update the analytics cache with tracked objects + scene.updateTrackedObjects(detection_type, tracked_objects) + + # When tracker is disabled, we need to publish analytics based on tracked objects from MQTT + if self.disable_tracker: + analytics_objects = scene.getTrackedObjects(detection_type) + log.debug(f"Tracker disabled - received objects: scene={scene_id}, type={detection_type}, count={len(analytics_objects)}") + scene._updateVisible(analytics_objects) + + msg_when = get_epoch_time(jdata.get('timestamp')) + + # Publish detections using the tracked objects + self.publishDetections(scene, analytics_objects, msg_when, detection_type, jdata, None) + self.publishEvents(scene, jdata.get('timestamp')) + + return + def _handleChildSceneObject(self, sender_id, jdata, detection_type, msg_when): sender = self.cache_manager.sceneWithID(sender_id) if sender is None: @@ -516,7 +581,8 @@ def updateObjectClasses(self): results = self.cache_manager.data_source.getAssets() if results and 'results' in results: for scene in self.scenes: - scene.tracker.updateObjectClasses(results['results']) + if scene.tracker is not None: + scene.tracker.updateObjectClasses(results['results']) return def updateTRSMatrix(self): @@ -594,12 +660,21 @@ def updateSubscriptions(self): self.scenes = self.cache_manager.allScenes() for scene in self.scenes: - for camera in scene.cameras: - need_subscribe.add((PubSub.formatTopic(PubSub.DATA_CAMERA, camera_id=camera), - self.handleMovingObjectMessage)) - for sensor in scene.sensors: - need_subscribe.add((PubSub.formatTopic(PubSub.DATA_SENSOR, sensor_id=sensor), - self.handleSensorMessage)) + # Only subscribe to camera and sensor topics when tracker is enabled + if not self.disable_tracker: + for camera in scene.cameras: + need_subscribe.add((PubSub.formatTopic(PubSub.DATA_CAMERA, camera_id=camera), + self.handleMovingObjectMessage)) + for sensor in scene.sensors: + need_subscribe.add((PubSub.formatTopic(PubSub.DATA_SENSOR, sensor_id=sensor), + self.handleSensorMessage)) + + # Subscribe to scene data (tracked objects) for Analytics to consume + # This reuses the existing DATA_SCENE topic that tracker already publishes to + if self.disable_tracker: + need_subscribe.add((PubSub.formatTopic(PubSub.DATA_SCENE, scene_id=scene.uid, thing_type="+"), + self.handleSceneDataMessage)) + if hasattr(scene, 'children'): child_scenes = self.cache_manager.data_source.getChildScenes(scene.uid) diff --git a/sample_data/docker-compose-dl-streamer-example.yml b/sample_data/docker-compose-dl-streamer-example.yml index 2fe22640c..355023578 100644 --- a/sample_data/docker-compose-dl-streamer-example.yml +++ b/sample_data/docker-compose-dl-streamer-example.yml @@ -169,6 +169,7 @@ services: --brokerauth /run/secrets/controller.auth --broker broker.scenescape.intel.com --ntp ntpserv + # --disable-tracker # mount the trackerconfig file to the container configs: - source: tracker-config