From d4bf4c4f626ae0e3011ae4c3772c24804cb06887 Mon Sep 17 00:00:00 2001 From: Filip Jeretina Date: Wed, 19 Mar 2025 12:20:55 +0100 Subject: [PATCH 1/3] stress-test: Use intensity leds api --- utilities/stress_test.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/utilities/stress_test.py b/utilities/stress_test.py index 7d7a1e002..88493336c 100644 --- a/utilities/stress_test.py +++ b/utilities/stress_test.py @@ -137,6 +137,9 @@ def create_yolo(pipeline: dai.Pipeline, camera: dai.node.ColorCamera) -> Tuple[s def clamp(num, v0, v1): return max(v0, min(num, v1)) +DOT_STEP = 0.05 +FLOOD_STEP = 0.05 + class PipelineContext: q_name_yolo_passthrough: Optional[str] = None """The name of the queue that the YOLO spatial detection network passthrough is connected to.""" @@ -150,8 +153,8 @@ def stress_test(mxid: str = ""): # May have some unknown args args, _ = parser.parse_known_args() - dot_intensity = 500 - flood_intensity = 500 + dot_intensity = 0.5 + flood_intensity = 0.5 iso = 800 exp_time = 20000 @@ -162,9 +165,9 @@ def stress_test(mxid: str = ""): cam_args.append(device_info) with dai.Device(*cam_args) as device: print("Setting default dot intensity to", dot_intensity) - device.setIrLaserDotProjectorBrightness(dot_intensity) + device.setIrLaserDotProjectorIntensity(dot_intensity) print("Setting default flood intensity to", flood_intensity) - device.setIrFloodLightBrightness(flood_intensity) + device.setIrFloodLightIntensity(flood_intensity) pipeline, outputs, pipeline_context = build_pipeline(device, args) device.startPipeline(pipeline) start_time = time.time() @@ -230,21 +233,21 @@ def stress_test(mxid: str = ""): print("Q Pressed, exiting stress test...") break elif key == ord('a'): - dot_intensity = clamp(dot_intensity - 100, 0, 1200) + dot_intensity = clamp(dot_intensity - DOT_STEP, 0, 1.0) print("Decreasing dot intensity by 100, new value:", dot_intensity) - device.setIrLaserDotProjectorBrightness(dot_intensity) + device.setIrLaserDotProjectorIntensity(dot_intensity) elif key == ord('d'): - dot_intensity = clamp(dot_intensity + 100, 0, 1200) + dot_intensity = clamp(dot_intensity + DOT_STEP, 0, 1.0) print("Increasing dot intensity by 100, new value:", dot_intensity) - device.setIrLaserDotProjectorBrightness(dot_intensity) + device.setIrLaserDotProjectorIntensity(dot_intensity) elif key == ord('w'): - flood_intensity = clamp(flood_intensity + 100, 0, 1500) + flood_intensity = clamp(flood_intensity + FLOOD_STEP, 0, 1.0) print("Increasing flood intensity by 100, new value:", flood_intensity) - device.setIrFloodLightBrightness(flood_intensity) + device.setIrFloodLightIntensity(flood_intensity) elif key == ord('s'): - flood_intensity = clamp(flood_intensity - 100, 0, 1500) + flood_intensity = clamp(flood_intensity - FLOOD_STEP, 0, 1.0) print("Decreasing flood intensity by 100, new value:", flood_intensity) - device.setIrFloodLightBrightness(flood_intensity) + device.setIrFloodLightIntensity(flood_intensity) elif key == ord('k'): iso = clamp(iso - 50, 0, 1600) print("Decreasing iso by 50, new value:", iso) From 8e8a2a8c8da50086f27c43ca89628158f1c76d33 Mon Sep 17 00:00:00 2001 From: Filip Jeretina Date: Tue, 22 Apr 2025 15:00:10 +0200 Subject: [PATCH 2/3] feat: Add datetime timestamps to stress tests --- utilities/stress_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utilities/stress_test.py b/utilities/stress_test.py index 88493336c..d4f0d710a 100644 --- a/utilities/stress_test.py +++ b/utilities/stress_test.py @@ -3,6 +3,7 @@ import cv2 import numpy as np import signal +import datetime def on_exit(sig, frame): cv2.destroyAllWindows() @@ -170,7 +171,6 @@ def stress_test(mxid: str = ""): device.setIrFloodLightIntensity(flood_intensity) pipeline, outputs, pipeline_context = build_pipeline(device, args) device.startPipeline(pipeline) - start_time = time.time() queues = [device.getOutputQueue(name, size, False) for name, size in outputs if name != "sys_log"] camera_control_q = device.getInputQueue("cam_control") @@ -221,7 +221,7 @@ def stress_test(mxid: str = ""): sys_info: dai.SystemInformation = sys_info_q.tryGet() if sys_info: print("----------------------------------------") - print(f"[{int(time.time() - start_time)}s] Usb speed {usb_speed}") + print(f"[{datetime.datetime.now().strftime('%d/%m/%Y, %H:%M:%S')}] Usb speed {usb_speed}") print("----------------------------------------") print_system_information(sys_info) for name, frame in last_frame.items(): From 82f205156fe09d4116f5bab171c5b3ceda9994e2 Mon Sep 17 00:00:00 2001 From: Filip Jeretina Date: Tue, 22 Apr 2025 15:39:54 +0200 Subject: [PATCH 3/3] fix: flush stdout so that logs aren't buffered when streaming to a file. --- utilities/stress_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/utilities/stress_test.py b/utilities/stress_test.py index d4f0d710a..8b78d36cd 100644 --- a/utilities/stress_test.py +++ b/utilities/stress_test.py @@ -4,6 +4,7 @@ import numpy as np import signal import datetime +import sys def on_exit(sig, frame): cv2.destroyAllWindows() @@ -83,6 +84,7 @@ def print_system_information(info: dai.SystemInformation): * 100, info.leonMssCpuUsage.average * 100) ) + sys.stdout.flush() def get_or_download_yolo_blob() -> str: