Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ broken-tests: \
april-tag-setup \
calibrate-camera-3d-ui-2d-ui \
out-of-box-no-ntp \
distance-msoce \
geospatial-ingest-publish \
delete-sensor-mqtt-api \

Expand Down Expand Up @@ -186,6 +185,7 @@ _metric-tests: \
idc-error-metric \
velocity-metric \
msoce-metric \
distance-msoce \

ui-tests:
$(MAKE) -Otarget _$@ SECRETSDIR=$(PWD)/manager/secrets SUPASS=$(SUPASS) -k
Expand Down
90 changes: 87 additions & 3 deletions tests/system/metric/tc_distance_thresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@
# SPDX-FileCopyrightText: (C) 2024 - 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import json
import os
import time

import cv2

import controller.tools.analytics.library.json_helper as json_helper
import controller.tools.analytics.library.metrics as metrics
import tests.common_test_utils as common
from controller.detections_builder import buildDetectionsList
from controller.scene import Scene
from scene_common.json_track_data import CamManager
from scene_common.scenescape import SceneLoader
from scene_common.camera import Camera
from scene_common.geometry import Region, Tripwire


def get_detections(tracked_data, scene, objects, jdata):
Expand Down Expand Up @@ -40,20 +48,95 @@ def track(params):
@param params Dict of parameters needed for tracking
@return tracked_data The filled list of tracked data
"""

tracked_data = []
scene = SceneLoader(params["config"], scene_model=Scene).scene

with open(params["trackerconfig"]) as f:
trackerConfigData = json.load(f)
max_unreliable_time = trackerConfigData["max_unreliable_frames"]/trackerConfigData["baseline_frame_rate"]
non_measurement_time_dynamic = trackerConfigData["non_measurement_frames_dynamic"]/trackerConfigData["baseline_frame_rate"]
non_measurement_time_static = trackerConfigData["non_measurement_frames_static"]/trackerConfigData["baseline_frame_rate"]
time_chunking_enabled = trackerConfigData["time_chunking_enabled"]
time_chunking_interval_ms = trackerConfigData["time_chunking_interval_milliseconds"]

camera_fps = []
for input_file in params["input"]:
cam = cv2.VideoCapture(input_file.removesuffix('.json')+'.mp4')
fps = cam.get(cv2.CAP_PROP_FPS)
if fps == 0.0:
fps = int(params["default_camera_frame_rate"]) # default value
camera_fps.append(fps)
cam.release()
ref_camera_fps = int(min(camera_fps))

if time_chunking_enabled:
time_chunking_interval_ms = int((1 / ref_camera_fps) * 1000)
print(f"Time chunking ENABLED with interval: {time_chunking_interval_ms}ms for {ref_camera_fps} FPS")
else:
print("Time chunking DISABLED")

loader = SceneLoader(params["config"])
scene_config = loader.config

scene = Scene(
scene_config['name'],
scene_config.get('map'),
scene_config.get('scale'),
max_unreliable_time=max_unreliable_time,
non_measurement_time_dynamic=non_measurement_time_dynamic,
non_measurement_time_static=non_measurement_time_static,
time_chunking_enabled=time_chunking_enabled,
time_chunking_interval_milliseconds=time_chunking_interval_ms
)

if 'sensors' in scene_config:
for name in scene_config['sensors']:
info = scene_config['sensors'][name]
if 'map points' in info:
if scene.areCoordinatesInPixels(info['map points']):
info['map points'] = scene.mapPixelsToMetric(info['map points'])
camera = Camera(name, info)
scene.cameras[name] = camera

if 'regions' in scene_config:
for region in scene_config['regions']:
points = region['points']
if scene.areCoordinatesInPixels(points):
region['points'] = scene.mapPixelsToMetric(points)
region_obj = Region(region['uuid'], region['name'], {'points': region['points']})
scene.regions[region_obj.name] = region_obj

if 'tripwires' in scene_config:
for tripwire in scene_config['tripwires']:
points = tripwire['points']
if scene.areCoordinatesInPixels(points):
points = scene.mapPixelsToMetric(points)
tripwire_obj = Tripwire(tripwire['uuid'], tripwire['name'], {'points': points})
scene.tripwires[tripwire_obj.name] = tripwire_obj

scene.ref_camera_frame_rate = ref_camera_fps
mgr = CamManager(params["input"], scene)

detected_category = None
if 'assets' in params:
scene.tracker.updateObjectClasses(params['assets'])

frame_interval = 1.0 / ref_camera_fps if time_chunking_enabled else 0
start_time = time.time()
frame_count = 0

while True:
_, cam_detect, _ = mgr.nextFrame(scene, loop=False)
if not cam_detect:
break
objects = cam_detect["objects"]

if time_chunking_enabled:
frame_count += 1
expected_time = start_time + (frame_count * frame_interval)
current_time = time.time()
sleep_time = expected_time - current_time
if sleep_time > 0:
time.sleep(sleep_time)

scene.processCameraData(cam_detect)

jdata = {
Expand All @@ -63,6 +146,7 @@ def track(params):
}
get_detections(tracked_data, scene, objects, jdata)

scene.tracker.waitForComplete()
scene.tracker.join()
return tracked_data

Expand Down
Loading