diff --git a/README.md b/README.md index 168938749..20af84c36 100644 --- a/README.md +++ b/README.md @@ -141,3 +141,18 @@ Afterwards, navigate to desired application as listed below + +### [⚙️ Dynamic Calibration](dynamic-calibration/) + + + + + + +
+ Dynamic Calibration + + Example of showing off the dynamic recalibration in action +

+ ⚙️ Dynamic calibration +
diff --git a/dynamic-calibration/.oakappignore b/dynamic-calibration/.oakappignore new file mode 100644 index 000000000..b7ff68f53 --- /dev/null +++ b/dynamic-calibration/.oakappignore @@ -0,0 +1,37 @@ +# Python virtual environments +venv/ +.venv/ + +# Node.js +# ignore node_modules, it will be reinstalled in the container +node_modules/ + +# Multimedia files +media/ + +# Documentation +README.md + +# VCS +.git/ +.github/ +.gitlab/ + +# Ignore specific files in utils/ +utils/dynamic_calibration_interative.py +utils/helper_functions.py + +# The following files are ignored by default +# uncomment a line if you explicitly need it + +# !*.oakapp + +# Python +# !**/.mypy_cache/ +# !**/.ruff_cache/ + +# IDE files +# !**/.idea +# !**/.vscode +# !**/.zed + diff --git a/dynamic-calibration/README.md b/dynamic-calibration/README.md new file mode 100644 index 000000000..a391f1b85 --- /dev/null +++ b/dynamic-calibration/README.md @@ -0,0 +1,116 @@ +# Stereo Dynamic Calibration + +This example demonstrates **runtime stereo camera calibration** with the `DynamicCalibration` node, plus a host-side controller/visualizer that overlays helpful UI (help panel, coverage bar, quality/recalibration modals, and a depth ROI HUD). + +> Works in **peripheral mode**: the device performs calibration; the host sends commands and renders overlays. + +## Features + +- **Interactive commands**: start/force recalibration, load images, run quality checks, apply/rollback calibrations, and **flash** (EEPROM) new/previous/factory calibration. +- **Coverage bar**: centered, large progress bar while collecting frames (or briefly after `l`). +- **Quality modal**: big 3-color bar (GOOD / COULD BE IMPROVED / NEEDS RECALIBRATION) with a pointer based on rotation change and a summary of depth-error deltas. +- **Recalibration modal**: summary with Euler angle deltas and depth-error deltas; prompts to flash if there is a significant change. +- **Depth HUD**: optional, shows depth/disp at a movable ROI (center + mean), with a small “tiny box” indicator. +- **Always-on help panel** (toggleable). + +## Demo + +

+ demo +

+ +## Requirements + +- A **Luxonis device** connected via USB/Ethernet. +- Packages: + - `depthai` + - `depthai-nodes` + - `opencv-python` + - `numpy` + +Install via: + +```bash +pip install -r requirements.txt +``` + +## Run + +```bash +python3 main.py +# or +python3 main.py --fps_limit 10 +# or +python3 main.py --device 18443010C1BA9D1200 +``` + +When launched, the app starts a RemoteConnection server. Open the visualizer at: + +``` +http://localhost:8082 +``` + +Replace `localhost` with your host IP if viewing from another machine. + +## Controls + +Use these keys while the app is running (focus the browser visualizer window): + +| Key | Action | +| ----------- | ---------------------------------------------------------------------------------------- | +| `q` | Quit the app | +| `h` | Toggle help panel | +| `g` | Toggle Depth HUD (ROI readout) | +| `r` | Start recalibration | +| `d` | **Force** recalibration | +| `l` | Load image(s) for calibration (shows coverage bar for ~2s) | +| `c` | Calibration quality check | +| `v` | **Force** calibration quality check | +| `n` | Apply **NEW** calibration (when available) | +| `o` | Apply **PREVIOUS** calibration (rollback) | +| `p` | **Flash NEW/current** calibration to EEPROM | +| `k` | **Flash PREVIOUS** calibration to EEPROM | +| `f` | **Flash FACTORY** calibration to EEPROM | +| `w / a / s` | Move ROI up/left/down (Depth HUD).
**Note:** `d` is reserved for *Force recalibrate*. | +| `z / x` | ROI size − / + | + +> **Status banners** appear in the **center** after critical actions (e.g., applying/ flashing calibration) and auto-hide after ~2s.\ +> **Modals** (quality/recalibration) also appear centered and auto-hide after ~3.5s or on any key press. + +## On‑screen UI Cheat Sheet + +- **Help panel** (top-left): quick reference of all keys (toggle with `h`). +- **Coverage bar** (center): big progress bar while collecting frames; also shown briefly (≈2s) after pressing `l`. +- **Quality modal** (center): three colored segments (green/yellow/red) with a **downward** pointer (`▼`) indicating rotation-change severity; optional line with depth-error deltas (@1m/2m/5m/10m). +- **Recalibration modal** (center): “Recalibration complete”, significant-axis warning (if any), Euler angles, and depth-error deltas; suggests flashing if the change is significant. +- **Depth HUD** (inline): shows depth/disp at the ROI center and mean within a tiny box; move with `w/a/s` (and resize with `z/x`). + +## Output (console) + +- **Coverage**: per-cell coverage and acquisition status when emitted by the device. +- **Calibration results**: prints when a new calibration is produced and shows deltas: + - Rotation delta `|| r_current - r_new ||` in degrees, + - Mean Sampson error (new vs. current), + - Theoretical **Depth Error Difference** at 1/2/5/10 meters. +- **Quality checks**: same metrics as above without actually applying a new calibration. + +## Tips & Notes + +- To **flash** (EEPROM) from the UI you must pass the `device` into the controller (`dyn_ctrl.set_device(device)`). +- If you link **disparity** instead of **depth** to the controller, call `dyn_ctrl.set_depth_units_is_mm(False)` so the HUD labels use “Disp” instead of meters. +- The coverage percentage accepts either `[0..1]` or `[0..100]` from the device; the controller auto-detects and normalizes. +- The **Collecting frames** bar hides automatically 2s after pressing `l`; during active recalibration (`r`/`d`) it stays up until calibration finishes. + +## Installation (dev quick start) + +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install -U pip +pip install -r requirements.txt +python3 main.py +``` + +______________________________________________________________________ + +If you use this as a base for your own app, the heart of the UX is `utils/dynamic_controler.py` — it wires `DynamicCalibration` queues and renders all overlays via `ImgAnnotations` so you don’t need `cv2.imshow()`. diff --git a/dynamic-calibration/main.py b/dynamic-calibration/main.py new file mode 100644 index 000000000..92e49943e --- /dev/null +++ b/dynamic-calibration/main.py @@ -0,0 +1,86 @@ +import cv2 + +from depthai_nodes.node import ApplyColormap +import depthai as dai + +from utils.dynamic_controler import DynamicCalibrationControler +from utils.arguments import initialize_argparser + +_, args = initialize_argparser() + +visualizer = dai.RemoteConnection(httpPort=8082) +device = dai.Device(dai.DeviceInfo(args.device)) if args.device else dai.Device() +# ---------- Pipeline definition ---------- +with dai.Pipeline(device) as pipeline: + # Create camera nodes + cam_left = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_B) + cam_right = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_C) + + # Request full resolution NV12 outputs + left_out = cam_left.requestFullResolutionOutput( + dai.ImgFrame.Type.NV12, fps=args.fps_limit + ) + right_out = cam_right.requestFullResolutionOutput( + dai.ImgFrame.Type.NV12, fps=args.fps_limit + ) + + # Stereo node + stereo = pipeline.create(dai.node.StereoDepth) + left_out.link(stereo.left) + right_out.link(stereo.right) + + # Dynamic calibration node + dyn_calib = pipeline.create(dai.node.DynamicCalibration) + left_out.link(dyn_calib.left) + right_out.link(dyn_calib.right) + + # Output queues + depth_parser = pipeline.create(ApplyColormap).build(stereo.disparity) + # depth_parser.setMaxValue(int(stereo.initialConfig.getMaxDisparity())) # NOTE: Uncomment when DAI fixes a bug + depth_parser.setColormap(cv2.COLORMAP_JET) + + calibration = device.readCalibration() + new_calibration = None + old_calibration = None + + # --- existing --- + calibration_output = dyn_calib.calibrationOutput.createOutputQueue() + coverage_output = dyn_calib.coverageOutput.createOutputQueue() + quality_output = dyn_calib.qualityOutput.createOutputQueue() + input_control = dyn_calib.inputControl.createInputQueue() + device.setCalibration(calibration) + + # ---------- Visualizer setup ---------- + visualizer.addTopic("Left", stereo.syncedLeft, "images") + visualizer.addTopic("Right", stereo.syncedRight, "images") + visualizer.addTopic("Depth", depth_parser.out, "images") + + dyn_ctrl = pipeline.create(DynamicCalibrationControler).build( + preview=depth_parser.out, # for timestamped overlays + depth=stereo.depth, # raw uint16 depth in mm + ) + visualizer.addTopic("DynCalib HUD", dyn_ctrl.out_annotations, "images") + + pipeline.start() + visualizer.registerPipeline(pipeline) + + # give it queues + dyn_ctrl.set_coverage_output(coverage_output) + dyn_ctrl.set_calibration_output(calibration_output) + dyn_ctrl.set_command_input(input_control) + dyn_ctrl.set_quality_output(quality_output) + dyn_ctrl.set_depth_units_is_mm(True) # True for stereo.depth, False for disparity + dyn_ctrl.set_device(device) + + # (optional) Set current calibration + try: + dyn_ctrl.set_current_calibration(device.readCalibration()) + except Exception: + pass + + while pipeline.isRunning(): + key = visualizer.waitKey(1) + if key != -1: + dyn_ctrl.handle_key_press(key) + if dyn_ctrl.wants_quit: + break diff --git a/dynamic-calibration/media/dcl.gif b/dynamic-calibration/media/dcl.gif new file mode 100644 index 000000000..6a5721004 Binary files /dev/null and b/dynamic-calibration/media/dcl.gif differ diff --git a/dynamic-calibration/oakapp.toml b/dynamic-calibration/oakapp.toml new file mode 100644 index 000000000..fb7646820 --- /dev/null +++ b/dynamic-calibration/oakapp.toml @@ -0,0 +1,15 @@ +identifier = "com.example.dynamic-calibration" +app_version = "1.0.0" + +prepare_container = [ + { type = "RUN", command = "apt-get update" }, + { type = "RUN", command = "apt-get install -y python3-pip" }, + { type = "COPY", source = "requirements.txt", target = "requirements.txt" }, + { type = "RUN", command = "pip3 install -r /app/requirements.txt --break-system-packages" }, +] + +prepare_build_container = [] + +build_steps = [] + +entrypoint = ["bash", "-c", "python3 -u /app/main.py"] diff --git a/dynamic-calibration/requirements.txt b/dynamic-calibration/requirements.txt new file mode 100644 index 000000000..cb97be449 --- /dev/null +++ b/dynamic-calibration/requirements.txt @@ -0,0 +1,6 @@ +--extra-index-url https://artifacts.luxonis.com/artifactory/luxonis-python-snapshot-local +depthai==3.0.0 +depthai-nodes==0.3.4 +numpy>=1.22 +opencv-python==4.10.0.84 +opencv-contrib-python==4.10.0.84 diff --git a/dynamic-calibration/utils/__init__.py b/dynamic-calibration/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dynamic-calibration/utils/arguments.py b/dynamic-calibration/utils/arguments.py new file mode 100644 index 000000000..5a75645b8 --- /dev/null +++ b/dynamic-calibration/utils/arguments.py @@ -0,0 +1,31 @@ +import argparse + + +def initialize_argparser(): + """Initialize the argument parser for the script.""" + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.description = "This example showcases the capabinility of Dynamic Calibration on OAK devices. " + + parser.add_argument( + "-d", + "--device", + help="Optional name, DeviceID or IP of the camera to connect to.", + required=False, + default=None, + type=str, + ) + + parser.add_argument( + "-fps", + "--fps_limit", + help="FPS limit for the model runtime.", + required=False, + default=10, + type=int, + ) + + args = parser.parse_args() + + return parser, args diff --git a/dynamic-calibration/utils/dynamic_calibration_interactive.py b/dynamic-calibration/utils/dynamic_calibration_interactive.py new file mode 100644 index 000000000..a7f0291c6 --- /dev/null +++ b/dynamic-calibration/utils/dynamic_calibration_interactive.py @@ -0,0 +1,543 @@ +from collections import deque +import numpy as np +import cv2 +import time + +import depthai as dai +from helper_functions import ( + update_master_frame, + overlay_coverage_on_gray, + draw_recalibration_message, + draw_health_bar, + display_text, + draw_key_commands, + print_final_calibration_results, +) + +def nice_disparity_viz(in_disp_frame): + # 1) to float, keep a valid mask + disp = in_disp_frame.astype(np.float32) + valid = disp > 0 + + # If everything is invalid, just return a gray image + if not np.any(valid): + vis = np.zeros((*disp.shape, 3), np.uint8) + vis[:] = 127 + return vis + + # 2) robust range via percentiles on valid pixels + vals = disp[valid] + p_low, p_high = np.percentile(vals, (2, 98)) + + # Guard: widen if scene is too flat (avoid p_high == p_low) + if p_high - p_low < 1e-6: + p_low = float(vals.min()) + p_high = float(vals.max() + 1e-6) + + # 3) clip & normalize to 0–255 + disp_clipped = np.clip(disp, p_low, p_high) + disp_norm = (disp_clipped - p_low) / (p_high - p_low) + disp_u8 = np.uint8(np.clip(disp_norm * 255.0, 0, 255)) + + # 4) boost mid-tones locally (CLAHE) + # Helps a lot when the map is mostly one color + clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) + disp_eq = clahe.apply(disp_u8) + + # 5) apply a perceptual colormap (TURBO if available) + colormap = getattr(cv2, "COLORMAP_TURBO", cv2.COLORMAP_JET) + vis = cv2.applyColorMap(disp_eq, colormap) + return vis + +# Rolling average for depth ROI means +DEPTH_HISTORY_SIZE = 10 +depth_history = deque(maxlen=DEPTH_HISTORY_SIZE) +last_mouse_pos = None # reset history if mouse moves to new pixel +mouse_coords = (-1, -1) +roi_size = 10 +step_roi = 5 + + +def on_mouse_disparity(event, x, y, flags, param): + global mouse_coords + if event == cv2.EVENT_MOUSEMOVE: + mouse_coords = (x, y) + + +# ---------- Pipeline definition ---------- +pipeline = dai.Pipeline() + +# Create camera nodes +cam_left = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_B) +cam_right = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_C) + +# Request full resolution NV12 outputs +left_out = cam_left.requestFullResolutionOutput(dai.ImgFrame.Type.NV12, fps=5) +right_out = cam_right.requestFullResolutionOutput(dai.ImgFrame.Type.NV12, fps=5) +# Stereo node +stereo = pipeline.create(dai.node.StereoDepth) +stereo.setExtendedDisparity(True) +stereo.setSubpixel(True) +left_out.link(stereo.left) +right_out.link(stereo.right) + +# Dynamic calibration node +dyn_calib = pipeline.create(dai.node.DynamicCalibration) +left_out.link(dyn_calib.left) +right_out.link(dyn_calib.right) + + +# Output queues +left_xout = stereo.syncedLeft.createOutputQueue() +right_xout = stereo.syncedRight.createOutputQueue() +disp_xout = stereo.disparity.createOutputQueue() +depth_xout = stereo.depth.createOutputQueue() + +calibration_output = dyn_calib.calibrationOutput.createOutputQueue() +coverage_output = dyn_calib.coverageOutput.createOutputQueue() +quality_output = dyn_calib.qualityOutput.createOutputQueue() + +command_input = dyn_calib.inputControl.createInputQueue() + +device = pipeline.getDefaultDevice() +calibNew = device.readCalibration() +calibOld = device.readCalibration() +device.setCalibration(calibOld) +# ---------- Device and runtime loop ---------- +pipeline.start() +start = time.time() +leftFrame = None +rightFrame = None +collectFrames = False +depthDiff = [] +rotationDiff = [] +displayTimer = time.time() +state = "" +print( + "<<< -----------------------------|Introduction|------------------------------->>>" +) +print("Key commands:") +print("[c] -> Calibration quality check") +print("[r] -> Recalibrate") +print("[a] -> Force calibration check") +print("[d] -> Force recalibrate") +print("[l] Load image") +print("[n] -> Apply new calibration") +print("[o] -> Apply old calibration") +print("[s] -> Flash new calibration") +print("[k] -> Flash old calibration") +print("[f] -> Flash factory calibration") +print("[x] -> Save current frames.") +print("[q] -> Quit") +print( + "<<< -----------------------------|Start the pipeline!|------------------------->>>" +) +cv2.namedWindow("MasterFrame", cv2.WINDOW_NORMAL) +window_size = [1920, 1200] +cv2.resizeWindow("MasterFrame", window_size[0], window_size[1]) +cv2.setMouseCallback("MasterFrame", on_mouse_disparity) +text = "" +overall_coverage = None +overall_dataAcquired = 0 +with pipeline: + max_disp = stereo.initialConfig.getMaxDisparity() + while True: + now = time.time() + in_left = left_xout.get() + in_right = right_xout.get() + in_disp = disp_xout.get() + in_depth = depth_xout.get() + running_calibration = device.getCalibration() + M = np.array( + running_calibration.getCameraIntrinsics(dai.CameraBoardSocket.CAM_B) + ) + coverage = coverage_output.tryGet() + calibration_result = calibration_output.tryGet() + + fourthFrame = np.zeros((800, 1280, 3), dtype=np.uint8) + if in_disp: + assert isinstance(in_disp, dai.ImgFrame) + disp_frame = in_disp.getFrame() + disp_vis = nice_disparity_viz(disp_frame) + + if in_depth: + assert isinstance(in_depth, dai.ImgFrame) + depth_frame = in_depth.getFrame() + + if in_left: + assert isinstance(in_left, dai.ImgFrame) + leftFrame = in_left.getCvFrame() + + if in_right: + assert isinstance(in_right, dai.ImgFrame) + rightFrame = in_right.getCvFrame() + + if calibration_result: + print(f"Intermediate Information: {calibration_result.info}") + + if calibration_result and calibration_result.calibrationData: + state = "Recalibration" + finalDisplay = True + rotationDiff = np.array( + getattr( + calibration_result.calibrationData.calibrationDifference, + "rotationChange", + [], + ).copy() + ) + depthDiff = getattr( + calibration_result.calibrationData.calibrationDifference, + "depthErrorDifference", + [], + ).copy() + new_calibration = calibration_result.calibrationData.newCalibration + calibNew = new_calibration + command_input.send( + dai.DynamicCalibrationControl( + dai.DynamicCalibrationControl.Commands.ApplyCalibration( + new_calibration + ) + ) + ) + print("Applying new calibration.") + + if coverage: + overall_coverage = coverage.coveragePerCellA + overall_dataAcquired = coverage.dataAcquired + + masterFrame = update_master_frame(leftFrame, rightFrame, disp_vis, fourthFrame) + + if state == "CollectFrames" and overall_coverage is not None: + leftFrame = overlay_coverage_on_gray( + leftFrame, overall_coverage, overall_dataAcquired + ) + rightFrame = overlay_coverage_on_gray( + rightFrame, overall_coverage, overall_dataAcquired + ) + masterFrame = update_master_frame( + leftFrame, rightFrame, disp_vis, fourthFrame + ) + + elif state == "Recalibration" and finalDisplay: + leftFrame = draw_recalibration_message(leftFrame, depthDiff, rotationDiff) + rightFrame = draw_recalibration_message(rightFrame, depthDiff, rotationDiff) + fourthFrame = draw_recalibration_message( + fourthFrame, depthDiff, rotationDiff + ) + masterFrame = update_master_frame( + leftFrame, rightFrame, disp_vis, fourthFrame + ) + overall_coverage = None # Reset coverage after recalibration + overall_dataAcquired = 0 + key_end = cv2.waitKey(1) + if key_end != -1: # -1 means no key was pressed + finalDisplay = False + + elif state == "Calibration check" and finalDisplay: + fourthFrame = draw_health_bar( + fourthFrame, + depthDiff, + rotationDiff, + display_text=f"{text} Health Bad of Depth Difference Error", + ) + masterFrame = update_master_frame( + leftFrame, rightFrame, disp_vis, fourthFrame + ) + key_end = cv2.waitKey(1) + if key_end != -1: # -1 means no key was pressed + finalDisplay = False + + elif state == "Display Text" and finalDisplay: + display_text(masterFrame, text) + if np.abs(start_time - now) > 2: + finalDisplay = False + else: + draw_key_commands(fourthFrame) + masterFrame = update_master_frame( + leftFrame, rightFrame, disp_vis, fourthFrame + ) + + disp_x_start, disp_y_start = 0, masterFrame.shape[0] // 2 + disp_x_end, disp_y_end = masterFrame.shape[1] // 2, window_size[0] // 2 + scale_x = masterFrame.shape[1] / window_size[0] + scale_y = masterFrame.shape[0] / window_size[1] + scaled_mouse_x = int(mouse_coords[0] * scale_x) + scaled_mouse_y = int(mouse_coords[1] * scale_y) + if ( + disp_x_start <= scaled_mouse_x < disp_x_end + and disp_y_start <= scaled_mouse_y < disp_y_end + and in_disp + ): + local_x = scaled_mouse_x # disp_vis x offset + local_y = scaled_mouse_y - masterFrame.shape[0] // 2 # disp_vis y offset + if ( + 0 <= local_x < depth_frame.shape[1] + and 0 <= local_y < depth_frame.shape[0] + ): + cy = int(round(local_y * 2)) + cx = int(round(local_x * 2)) + half = roi_size + y0 = max(0, cy - half) + y1 = min(depth_frame.shape[0], cy + half + 1) + x0 = max(0, cx - half) + x1 = min(depth_frame.shape[1], cx + half + 1) + + roi = depth_frame[y0:y1, x0:x1].astype(np.float32) + + # Rolling-mean over last 10 frames; clears if cursor center changes + cur_mouse_pos = (cx, cy) + if last_mouse_pos != cur_mouse_pos: + depth_history.clear() + last_mouse_pos = cur_mouse_pos + + valid = roi > 0 + if np.any(valid): + depth_mm = float(roi[valid].mean()) # in millimeters + depth_history.append(depth_mm) + # else: keep previous history if current ROI is invalid + + if len(depth_history) > 0: + depth_val = float(np.mean(depth_history)) / 1000.0 # mm -> m + else: + depth_val = float("nan") + + # === Compute Euclidean distance using intrinsics === + # Intrinsic matrix parameters (from M) + fx = M[0, 0] + fy = M[1, 1] + cx_intr = M[0, 2] + cy_intr = M[1, 2] + + # Compute normalized pixel coordinates (optical center offset) + X = (cx - cx_intr) * depth_val / fx + Y = (cy - cy_intr) * depth_val / fy + Z = depth_val + + euclidean_dist = float(np.sqrt(X**2 + Y**2 + Z**2)) # meters + # === Display text in two lines === + text_pos_1 = (scaled_mouse_x + 10, scaled_mouse_y + 10) + text_pos_2 = (scaled_mouse_x + 10, scaled_mouse_y + 30) + + display_text_depth = f"Depth: {depth_val:.2f} m" + display_text_euc = f"3D Dist: {euclidean_dist:.2f} m" + + # Example of drawing (depends on your OpenCV overlay logic): + cv2.putText( + masterFrame, + display_text_depth, + text_pos_1, + cv2.FONT_HERSHEY_SIMPLEX, + 0.6, + (0, 255, 0), + 1, + cv2.LINE_AA, + ) + cv2.putText( + masterFrame, + display_text_euc, + text_pos_2, + cv2.FONT_HERSHEY_SIMPLEX, + 0.6, + (255, 255, 0), + 1, + cv2.LINE_AA, + ) + + # ROI display area (unchanged) + disp_x0 = disp_x_start + (x0 // 2) + disp_x1 = disp_x_start + ((x1 - 1) // 2 + 1) + disp_y0 = disp_y_start + (y0 // 2) + disp_y1 = disp_y_start + ((y1 - 1) // 2 + 1) + + overlay = masterFrame.copy() + cv2.rectangle( + overlay, (disp_x0, disp_y0), (disp_x1, disp_y1), (0, 0, 255), -1 + ) + alpha = 0.6 + masterFrame = cv2.addWeighted( + overlay, alpha, masterFrame, 1 - alpha, 0, masterFrame + ) + + if leftFrame is not None and rightFrame is not None: + cv2.imshow("MasterFrame", cv2.resize(masterFrame, (1920, 1200))) + + key = cv2.waitKey(1) + + if key == ord("q"): + cv2.destroyAllWindows() + exit() + break + + if key == ord("c"): + depthDiff = [] + command_input.send( + dai.DynamicCalibrationControl( + dai.DynamicCalibrationControl.Commands.LoadImage() + ) + ) + command_input.send( + dai.DynamicCalibrationControl( + dai.DynamicCalibrationControl.Commands.CalibrationQuality(True) + ) + ) + quality = quality_output.get() + state = "Calibration check" + finalDisplay = True + depthDiff = getattr(quality.qualityData, "depthErrorDifference", []).copy() + rotationDiff = getattr(quality.qualityData, "rotationChange", []).copy() + print_final_calibration_results(quality.qualityData, state) + print("Sending command for calibQualityCheck") + + elif key == ord("r"): + depthDiff = [] + command_input.send( + dai.DynamicCalibrationControl( + dai.DynamicCalibrationControl.Commands.StartCalibration() + ) + ) + state = "CollectFrames" + print("Sending command for recalibration") + + elif key == ord("a"): + depthDiff = [] + command_input.send( + dai.DynamicCalibrationControl( + dai.DynamicCalibrationControl.Commands.LoadImage() + ) + ) + coverage = coverage_output.tryGet() + command_input.send( + dai.DynamicCalibrationControl( + dai.DynamicCalibrationControl.Commands.CalibrationQuality(True) + ) + ) + quality = quality_output.get() + state = "Calibration check" + depthDiff = getattr(quality.qualityData, "depthErrorDifference", []).copy() + rotationDiff = getattr(quality.qualityData, "rotationChange", []).copy() + finalDisplay = True + print_final_calibration_results(quality.qualityData, state) + print("Sending command for forced calibQualityCheck") + + elif key == ord("d"): + command_input.send( + dai.DynamicCalibrationControl( + dai.DynamicCalibrationControl.Commands.Calibrate(True) + ) + ) + state = "Recalibration" + finalDisplay = True + print("Sending command for forced recalibration.") + + elif key == ord("n"): + state = "Display Text" + text = "New calibration applied successfully." + finalDisplay = True + print("Device applying new calibration") + command_input.send( + dai.DynamicCalibrationControl( + dai.DynamicCalibrationControl.Commands.ApplyCalibration(calibNew) + ) + ) + print("New calibration applied successfully.") + start_time = time.time() + + elif key == ord("o"): + state = "Display Text" + text = "Old calibration applied successfully." + finalDisplay = True + print("Device applying old calibration") + command_input.send( + dai.DynamicCalibrationControl( + dai.DynamicCalibrationControl.Commands.ApplyCalibration(calibOld) + ) + ) + print("Old calibration applied successfully.") + start_time = time.time() + + elif key == ord("s"): + state = "Display Text" + text = "New calibration flashed successfully." + finalDisplay = True + print("Device flasing new calibration") + device.flashCalibration(calibNew) + print("New calibration flashed successfully.") + start_time = time.time() + + elif key == ord("k"): + state = "Display Text" + text = "Old calibration flashed successfully." + finalDisplay = True + print("Device flasing old calibration") + device.flashCalibration(calibOld) + print("Old calibration flashed successfully.") + start_time = time.time() + elif key == ord("f"): + state = "Display Text" + text = "Factory calibration flashed successfully." + finalDisplay = True + print("Device flashing factory calibration") + device.flashCalibration(device.readFactoryCalibration()) + print("Factory calibration flashed successfully.") + start_time = time.time() + elif key == ord("l"): + state = "Display Text" + text = "Image loaded successfully." + finalDisplay = True + print("Loading image ... ") + command_input.send( + dai.DynamicCalibrationControl( + dai.DynamicCalibrationControl.Commands.LoadImage() + ) + ) + print("Image loaded successfully.") + start_time = time.time() + + elif key == ord("+"): + state = "Display Text" + text = f"Increase ROI size by 1. \nCurrent size {roi_size // 2}x {roi_size // 2}" + finalDisplay = True + if roi_size > step_roi: + roi_size += step_roi + else: + roi_size += step_roi + start_time = time.time() + + elif key == ord("-"): + state = "Display Text" + text = f"Decrease ROI size by 1. \nCurrent size {roi_size // 2}x {roi_size // 2}" + finalDisplay = True + if roi_size > step_roi: + roi_size -= step_roi + else: + roi_size = step_roi + start_time = time.time() + + elif key == ord("x"): + state = "Display Text" + text = "Saved current frames to img_left_*.npy and img_right_*.npy" + finalDisplay = True + print("Saved current frames to img_left_*.npy and img_right_*.npy.") + import os + + path = os.path.dirname(os.path.abspath(__file__)) + folder = os.path.join(path, f"dynamic_calib_dataset_{device.getMxId()}/") + import json + + if not os.path.exists(folder): + os.makedirs(folder, exist_ok=True) + i = time.time() - start + with open(f"{folder}calibration_before.json", "w") as f: + json.dump(device.readCalibration().eepromToJson(), f, indent=4) + print("Saved current calibration to calibration_before.json") + np.save(f"{folder}img_left_{i}.npy", leftFrame) + np.save(f"{folder}img_right_{i}.npy", rightFrame) + print("Saved current frames to img_left_*.npy and img_right_*.npy") + + calibFactory = device.readFactoryCalibration() + device.readCalibration().eepromToJsonFile(f"{folder}calibration.json") + calibFactory.eepromToJsonFile(f"{folder}factoryCalibration.json") + start_time = time.time() + + print("Finished saving dataset.") + pipeline.stop() diff --git a/dynamic-calibration/utils/dynamic_controler.py b/dynamic-calibration/utils/dynamic_controler.py new file mode 100644 index 000000000..94839dfde --- /dev/null +++ b/dynamic-calibration/utils/dynamic_controler.py @@ -0,0 +1,1059 @@ +from typing import Optional, Tuple +from collections import deque +import numpy as np +import time + +import depthai as dai + + +class DynamicCalibrationControler(dai.node.HostNode): + def __init__(self): + super().__init__() + # queues you set after pipeline.start() + self._cmd_q = None + self._quality_q = None + + # extra input for raw depth/disparity frames + self.in_depth = self.createInput( + types=[dai.Node.DatatypeHierarchy(dai.DatatypeEnum.ImgFrame, True)], + queueSize=1, + blocking=False, + waitForMessage=False, + ) + + self._device: Optional[dai.Device] = None # for flash operations + + # state + self.calibration: Optional[dai.CalibrationHandler] = None + self.new_calibration: Optional[dai.CalibrationHandler] = None + self.old_calibration: Optional[dai.CalibrationHandler] = None + self.last_quality = None + self.wants_quit = False + self._show_help = True + + self._calib_q = None # queue for dyn_calib.calibrationOutput + self._last_calib_diff = None # latest calibrationDifference + self.auto_apply_new = True # set False if you prefer pressing 'n' + + self._coverage_q = None + self._coverage_vec = ( + None # flattened coveragePerCellA (0..1 or 0..100 per cell) + ) + self._coverage_pct = 0.0 # mean coverage in % + self._data_acquired = 0.0 # dataAcquired from device (0..1 or 0..100) + self._collecting = False # are we currently collecting frames? + self._collecting_until = 0.0 + + # transient status banner (2s) + self._status_text = None + self._status_expire_ts = 0.0 + + # modal overlays (centered), auto-hide or dismiss on key + self._modal_kind = None # None | "recalib" | "quality" + self._modal_expire_ts = 0.0 + self._modal_payload = {} + + # depth HUD state + self._last_depth = None # np.ndarray (uint16 depth mm or disparity) + self._depth_is_mm = True # set False if you link disparity + self._cursor_xy = None # (x,y) in depth pixels + self._roi_half = 10 # radius in depth pixels (=> (2h+1)^2 window) + self._hud_on = True # toggle with 'g' + + self._avg_len = 10 + self._roi_mean_hist = deque(maxlen=self._avg_len) + self._center_hist = deque(maxlen=self._avg_len) + self._hist_cursor_xy = None + + self.out_annotations = self.createOutput( + possibleDatatypes=[ + dai.Node.DatatypeHierarchy(dai.DatatypeEnum.ImgAnnotations, True) + ] + ) + + def set_device(self, device: dai.Device): + """Provide a device so we can flash calibrations from keypresses.""" + self._device = device + + # ---- external wiring ---- + def set_command_input(self, q): + self._cmd_q = q + + def set_quality_output(self, q): + self._quality_q = q + + def set_coverage_output(self, q): + """Pass dyn_calib.coverageOutput.createOutputQueue(...) here.""" + self._coverage_q = q + + # If you want to use disparity instead of depth (units in “levels”): + def set_depth_units_is_mm(self, is_mm: bool): + self._depth_is_mm = is_mm + + def set_calibration_output(self, q): + """Pass dyn_calib.calibrationOutput.createOutputQueue(...) here.""" + self._calib_q = q + + def _flash_status(self, text: str, seconds: float = 2.0): + self._status_text = text + self._status_expire_ts = time.time() + seconds + + def set_auto_apply_new(self, enable: bool): + self.auto_apply_new = bool(enable) + + def build(self, preview: dai.Node.Output, depth: dai.Node.Output = None): + # 'preview' is the stream we time-stamp overlays against (e.g., your colormapped depth) + self.link_args(preview) + if depth is not None: + depth.link(self.in_depth) + return self + + # ---- helpers ---- + def _send_cmd(self, cmd): + if self._cmd_q is None: + raise RuntimeError( + "Command queue not set. Call set_command_input(...) after pipeline.start()" + ) + self._cmd_q.send(dai.DynamicCalibrationControl(cmd)) + + def _coverage_bar_text(self, pct: float, width_chars: int = 36) -> str: + pct = float(np.clip(pct, 0.0, 100.0)) + filled = int(round((pct / 100.0) * width_chars)) + return "█" * filled + "░" * (width_chars - filled) + + def _draw_depth_hud(self, hud: dai.ImgAnnotation, preview_w: int, preview_h: int): + if self._last_depth is None or not self._hud_on: + return + + H, W = self._last_depth.shape[:2] + # init cursor at center the first time + if self._cursor_xy is None: + self._cursor_xy = (W // 2, H // 2) + + cx, cy = self._cursor_xy + cx = int(np.clip(cx, 0, W - 1)) + cy = int(np.clip(cy, 0, H - 1)) + + # ROI bounds + h = int(max(1, self._roi_half)) + x0, x1 = max(0, cx - h), min(W, cx + h + 1) + y0, y1 = max(0, cy - h), min(H, cy + h + 1) + roi = self._last_depth[y0:y1, x0:x1] + + # stats (ignore zeros if depth) + if self._hist_cursor_xy != (cx, cy): + self._roi_mean_hist.clear() + self._center_hist.clear() + self._hist_cursor_xy = (cx, cy) + + if self._depth_is_mm: + valid = roi > 0 + + # center (mm) + center_mm = float(self._last_depth[cy, cx]) + if center_mm > 0: + self._center_hist.append(center_mm) + + # roi mean (mm) + if np.any(valid): + roi_mean_mm = float(roi[valid].mean()) + self._roi_mean_hist.append(roi_mean_mm) + + # aggregated (m) + if len(self._center_hist) > 0: + center_val = float(np.mean(self._center_hist)) / 1000.0 + else: + center_val = float("nan") + + if len(self._roi_mean_hist) > 0: + mean_val = float(np.mean(self._roi_mean_hist)) / 1000.0 + else: + mean_val = float("nan") + + val_text = ( + f"Depth: {center_val:.2f} m, " + f"ROI mean: {mean_val:.2f} m " + f"(avg {len(self._roi_mean_hist)}/{self._avg_len})" + ) + else: + # disparity path (no zero filtering) + center_disp = float(self._last_depth[cy, cx]) + roi_mean_disp = float(roi.mean()) + + self._center_hist.append(center_disp) + self._roi_mean_hist.append(roi_mean_disp) + + center_val = float(np.mean(self._center_hist)) + mean_val = float(np.mean(self._roi_mean_hist)) + + val_text = ( + f"Disp: {center_val:.1f}, " + f"ROI mean: {mean_val:.1f} " + f"(avg {len(self._roi_mean_hist)}/{self._avg_len})" + ) + + # normalized positions relative to preview (assume 1:1 mapping depth→preview) + u = (cx + 0.5) / W + v = (cy + 0.5) / H + + # place readout slightly offset + hud.texts.append( + self._create_text_annot( + val_text, (min(u + 0.02, 0.95), min(v + 0.02, 0.95)) + ) + ) + + # "tiny box" corners using text markers (no special rect types needed) + corner = "□" + u0, v0 = x0 / W, y0 / H + u1, v1 = x1 / W, y1 / H + hud.texts.append(self._create_text_annot(corner, (u0, v0), bg=(0, 0, 0, 0))) + hud.texts.append(self._create_text_annot(corner, (u1, v0), bg=(0, 0, 0, 0))) + hud.texts.append(self._create_text_annot(corner, (u0, v1), bg=(0, 0, 0, 0))) + hud.texts.append(self._create_text_annot(corner, (u1, v1), bg=(0, 0, 0, 0))) + + # --- styling helpers (white on translucent black) --- + def _create_text_annot( + self, + text: str, + pos: Tuple[float, float], + size=14, + bg=(0, 0, 0, 0.55), + fg=(1, 1, 1, 1), + ): + t = dai.TextAnnotation() + t.fontSize = size + t.backgroundColor = dai.Color(*bg) + t.textColor = dai.Color(*fg) + t.position = dai.Point2f(*pos) # normalized [0..1] + t.text = text + return t + + # --- centered panels / modals --- + def _push_center_panel( + self, + img_annots: dai.ImgAnnotations, + lines, + *, + y_center=0.50, + width=0.74, + line_size=20, + line_gap=0.035, + accent_idx=None, + accent_colors=None, + ): + """ + Draws a centered panel with a single translucent background slab and multiple lines. + `accent_idx`: set of indices to color differently (e.g., titles/warnings). + `accent_colors`: dict idx->(r,g,b,a) for text, e.g. (1,0,0,1) for red. + """ + if not lines: + return + x0 = (1.0 - width) / 2.0 + x_text = x0 + 0.15 + n = len(lines) + total_h = n * line_gap + y0 = y_center - total_h / 2 + + # foreground text + hud = dai.ImgAnnotation() + for i, line in enumerate(lines): + fg = (1, 1, 1, 1) + if accent_idx and i in accent_idx and accent_colors and i in accent_colors: + fg = accent_colors[i] + hud.texts.append( + self._create_text_annot( + line, + (x_text, y0 + i * line_gap), + size=line_size, + bg=(0, 0, 0, 0), + fg=fg, + ) + ) + img_annots.annotations.append(hud) + + def _bar_string(self, length_chars, char="█"): + return char * max(0, int(length_chars)) + + def _push_quality_bar_modal( + self, img_annots: dai.ImgAnnotations, values, rotation, display_text="" + ): + """ + Big centered quality modal: + - Title: 'Calibration Quality: {GOOD | COULD BE IMPROVED | NEEDS RECALIBRATION}' + - 3 contiguous colored bar segments (green/yellow/red) packed closer together + - Downward pointer (▼) placed ABOVE the bar (pointing down at it) + - No text rendered below the bar + """ + if rotation is None or len(rotation) == 0: + self._push_center_panel( + img_annots, + [ + "Data is missing — please load more images with 'l'.", + "Press any key to continue …", + ], + y_center=0.42, + width=0.84, + line_size=20, + line_gap=0.04, + ) + return + + rot_max = float(np.max(np.abs(rotation))) + # thresholds (deg) + t1, t2 = 0.07, 0.15 + + # status text + title color + if rot_max <= t1: + status = "GOOD" + title_color = (0, 1, 0, 1) + elif rot_max <= t2: + status = "COULD BE IMPROVED" + title_color = (1, 1, 0, 1) + else: + status = "NEEDS RECALIBRATION" + title_color = (1, 0, 0, 1) + + # Centered title (use a narrower panel so it looks centered) + title = f"Calibration Quality: {status}" + self._push_center_panel( + img_annots, + [title], + y_center=0.42, + width=0.60, + line_size=24, + line_gap=0.04, + accent_idx={0}, + accent_colors={0: title_color}, + ) + + # Optional summary ABOVE the bar (never below) + if values is not None and len(values) >= 4: + summary = ( + f"Depth error @1m:{values[0]:.2f}%, 2m:{values[1]:.2f}%, " + f"5m:{values[2]:.2f}%, 10m:{values[3]:.2f}%" + ) + self._push_center_panel( + img_annots, + [summary], + y_center=0.62, + width=0.90, + line_size=18, + line_gap=0.035, + ) + + # Bar geometry (packed closer together) + x0, x1 = 0.20, 0.80 # tighter horizontal span + y_bar = 0.52 # bar vertical position + w = x1 - x0 + total_chars = 45 + seg_chars = total_chars // 3 + + good = self._bar_string(seg_chars) + mid = self._bar_string(seg_chars) + bad = self._bar_string(total_chars - 2 * seg_chars) + + # contiguous segments (minimal padding between thirds) + seg_dx = w / 3.0 + lefts = [x0 + 0.01, x0 + seg_dx + 0.005, x0 + 2 * seg_dx + 0.005] + + def _slice(x, y, s, color): + ann = dai.ImgAnnotation() + ann.texts.append( + self._create_text_annot(s, (x, y), size=26, bg=(0, 0, 0, 0), fg=color) + ) + img_annots.annotations.append(ann) + + _slice(lefts[0], y_bar, good, (0, 1, 0, 1)) # green + _slice(lefts[1], y_bar, mid, (1, 1, 0, 1)) # yellow + _slice(lefts[2], y_bar, bad, (1, 0, 0, 1)) # red + + # Downward pointer '▼' placed ABOVE the bar (pointing down at it) + if rot_max <= t1: + frac = (rot_max / t1) * (1.0 / 3.0) + elif rot_max <= t2: + frac = (1.0 / 3.0) + ((rot_max - t1) / (t2 - t1)) * (1.0 / 3.0) + else: + cap = max(t2, min(rot_max, 2.0 * t2)) + frac = (2.0 / 3.0) + ((cap - t2) / t2) * (1.0 / 3.0) + frac = min(frac, 0.999) + + x_ptr = x0 + frac * w + arrow = dai.ImgAnnotation() + arrow.texts.append( + self._create_text_annot( + "▼", (x_ptr, y_bar - 0.045), size=24, bg=(0, 0, 0, 0), fg=(1, 1, 1, 1) + ) + ) + img_annots.annotations.append(arrow) + + # NOTE: No labels or tail text are rendered under the bar. + + def _push_recalibration_modal(self, img_annots: dai.ImgAnnotations, values, angles): + """ + Recreates your draw_recalibration_message. + """ + lines = [] + accents = {} + if values is None or len(values) == 0 or angles is None or len(angles) < 3: + lines = [ + "Data is missing — please load more images with 'l'.", + "Press any key to continue …", + ] + self._push_center_panel( + img_annots, + lines, + y_center=0.42, + width=0.84, + line_size=20, + line_gap=0.04, + ) + return + + threshold = 0.075 + abs_ang = np.abs(np.asarray(angles)) + over = np.where(abs_ang > threshold)[0].tolist() + axis_names = ["Roll", "Pitch", "Yaw"] + lines.append("Recalibration complete") + accents[0] = (0, 1, 0, 1) # green title + + if over: + axes = ", ".join(axis_names[i] for i in over) + lines.append(f"Significant change in rotation! {axes}") + accents[1] = (1, 0, 0, 1) # red warning + lines.append("To permanently flash new calibration, press 's'!") + else: + lines.append("No significant change detected") + + lines.append( + f"Euler angles (deg): Roll={angles[0]:.2f}, " + f"Pitch={angles[1]:.2f}, Yaw={angles[2]:.2f}" + ) + if values is not None and len(values) >= 4: + lines.append( + f"Depth error @1m:{values[0]:.2f}%, 2m:{values[1]:.2f}%, " + f"5m:{values[2]:.2f}%, 10m:{values[3]:.2f}%" + ) + lines.append("Press any key to continue …") + + self._push_center_panel( + img_annots, + lines, + y_center=0.55, + width=0.80, + line_size=20, + line_gap=0.04, + accent_idx=set(accents.keys()), + accent_colors=accents, + ) + + # --- help panel --- + def _push_help_panel(self, img_annots: dai.ImgAnnotations, frame: dai.ImgFrame): + """ + Draws a compact help list without per-line background (keeps it readable). + """ + lines = [ + "DynamicCalibration — Key commands", + "[c] Calibration quality check", + "[r] Recalibrate", + "[C] Force calibration check", + "[R] Force recalibrate", + "[l] Load image", + "[n] Apply new calibration", + "[o] Apply old/previous", + "[p] Flash new calibration", + "[k] Flash old calibration", + "[f] Flash factory calibration", + "[g] Toggle depth HUD [h] Toggle help", + "[w]/[a]/[s] Move ROI [z]/[x] ROI size", + "", + "[h] Show/Hide Help Display", + "[q] Quit", + ] + + # Layout (smaller font, tighter spacing) + size = 18 + x = 0.04 # left + y0 = 0.075 # top + dy = 0.025 # line spacing (tight) + + hud = dai.ImgAnnotation() + # Title slightly larger + hud.texts.append( + self._create_text_annot( + lines[0], (x, y0), size=16, bg=(0, 0, 0, 0), fg=(1, 1, 1, 1) + ) + ) + y = y0 + dy * 1.2 + for line in lines[1:]: + hud.texts.append( + self._create_text_annot( + line, (x, y), size=size, bg=(0, 0, 0, 0), fg=(1, 1, 1, 1) + ) + ) + y += dy + + img_annots.annotations.append(hud) + img_annots.setTimestamp(frame.getTimestamp()) + + def _push_center_banner( + self, + img_annots: dai.ImgAnnotations, + text: str, + x0=0.25, + x1=0.75, + y_center=0.50, + band_h=0.14, + size=20, + ): + # Background slab built from spacer rows + slab = dai.ImgAnnotation() + rows = 12 + for i in range(rows): + y = y_center - band_h / 2 + i * (band_h / (rows - 1)) + slab.texts.append( + self._create_text_annot( + " ", (x0, y), size=size, bg=(0, 0, 0, 0.55), fg=(0, 0, 0, 0) + ) + ) + img_annots.annotations.append(slab) + + # Foreground text (center-left aligned with a small left margin) + txt = dai.ImgAnnotation() + txt.texts.append( + self._create_text_annot( + text, + (x0 + 0.02, y_center - 0.01), + size=size, + bg=(0, 0, 0, 0), + fg=(1, 1, 1, 1), + ) + ) + img_annots.annotations.append(txt) + + # --- existing HUD, but styled to match (no green) --- + def _push_status_overlay(self, img_annots: dai.ImgAnnotations, frame: dai.ImgFrame): + hud = dai.ImgAnnotation() + y, dy, x = 0.05, 0.04, 0.05 + + if self.old_calibration: + hud.texts.append( + self._create_text_annot("PREVIOUS calibration: available", (x, y)) + ) + y += dy + + if self.last_quality and getattr(self.last_quality, "qualityData", None): + q = self.last_quality.qualityData + rotmag = float( + np.sqrt( + q.rotationChange[0] ** 2 + + q.rotationChange[1] ** 2 + + q.rotationChange[2] ** 2 + ) + ) + hud.texts.append( + self._create_text_annot(f"Δrot = {rotmag:.3f}°", (0.55, 0.05)) + ) + hud.texts.append( + self._create_text_annot( + f"Sampson mean: new={q.sampsonErrorNew:.3f}px current={q.sampsonErrorCurrent:.3f}px", + (0.55, 0.09), + ) + ) + + # Small hint to show help + hud.texts.append( + self._create_text_annot( + "[h] Help", (0.05, 0.92), size=14, bg=(0, 0, 0, 0.35) + ) + ) + img_annots.annotations.append(hud) + img_annots.setTimestamp(frame.getTimestamp()) + + def _push_overlay(self, frame: dai.ImgFrame): + img_annots = dai.ImgAnnotations() + + # Always show help panel + if self._show_help: + self._push_help_panel(img_annots, frame) + + now = time.time() + + # If 'l' timer elapsed, stop timed collecting + if ( + self._collecting + and self._collecting_until > 0.0 + and now >= self._collecting_until + ): + self._collecting = False + self._collecting_until = 0.0 + + # Coverage bar: show while collecting (respecting timer) or when partial progress exists + show_cov = ( + self._collecting + and (self._collecting_until == 0.0 or now < self._collecting_until) + ) or (self._coverage_vec is not None and 0.0 < self._coverage_pct < 100.0) + + # Flash banner (2s) if any + if self._status_text: + if now < self._status_expire_ts: + self._push_center_banner(img_annots, self._status_text, size=22) + else: + self._status_text = None + self._status_expire_ts = 0.0 + + elif show_cov: + # Centered, bold coverage bar (no background slab) + x0, x1 = 0.24, 0.67 + y_center = 0.51 + + bar = self._coverage_bar_text(self._coverage_pct, width_chars=36) + txt = f"Collecting frames {self._coverage_pct:5.1f}% {bar}" + + banner = dai.ImgAnnotation() + banner.texts.append( + self._create_text_annot( + txt, + (x0 + 0.02, y_center - 0.02), + size=22, + bg=(0, 0, 0, 0), + fg=(1, 1, 1, 1), + ) + ) + img_annots.annotations.append(banner) + + # When no banner, no coverage bar, and no modal → draw the depth HUD + no_banner = not (self._status_text and now < self._status_expire_ts) + no_modal = not (self._modal_kind and now < self._modal_expire_ts) + if no_banner and not show_cov and no_modal: + hud = dai.ImgAnnotation() + try: + self._draw_depth_hud(hud, frame.getWidth(), frame.getHeight()) + except Exception: + pass + if len(hud.texts) > 0: + img_annots.annotations.append(hud) + + # Small metrics (top-right) only when no modal is up + if not (self._modal_kind and time.time() < self._modal_expire_ts): + if self.last_quality and getattr(self.last_quality, "qualityData", None): + qhud = dai.ImgAnnotation() + q = self.last_quality.qualityData + rotmag = float( + np.sqrt( + q.rotationChange[0] ** 2 + + q.rotationChange[1] ** 2 + + q.rotationChange[2] ** 2 + ) + ) + qhud.texts.append( + self._create_text_annot( + f"Δrot = {rotmag:.3f}°", + (0.62, 0.06), + size=16, + bg=(0, 0, 0, 0.35), + ) + ) + qhud.texts.append( + self._create_text_annot( + f"Sampson mean: new={q.sampsonErrorNew:.3f}px current={q.sampsonErrorCurrent:.3f}px", + (0.62, 0.10), + size=16, + bg=(0, 0, 0, 0.35), + ) + ) + img_annots.annotations.append(qhud) + + elif self._last_calib_diff is not None: + q = self._last_calib_diff + rotmag = float( + np.sqrt( + q.rotationChange[0] ** 2 + + q.rotationChange[1] ** 2 + + q.rotationChange[2] ** 2 + ) + ) + hud_metrics = dai.ImgAnnotation() + hud_metrics.texts.append( + self._create_text_annot( + f"Δrot(new vs current) = {rotmag:.3f}°", + (0.62, 0.06), + size=16, + bg=(0, 0, 0, 0.35), + ) + ) + hud_metrics.texts.append( + self._create_text_annot( + f"Sampson: new={q.sampsonErrorNew:.3f}px current={q.sampsonErrorCurrent:.3f}px", + (0.62, 0.10), + size=16, + bg=(0, 0, 0, 0.35), + ) + ) + d = getattr(q, "depthErrorDifference", None) + if d and len(d) >= 4: + hud_metrics.texts.append( + self._create_text_annot( + f"Depth Δ @1/2/5/10m: {d[0]:.2f}% / {d[1]:.2f}% / {d[2]:.2f}% / {d[3]:.2f}%", + (0.62, 0.14), + size=16, + bg=(0, 0, 0, 0.35), + ) + ) + img_annots.annotations.append(hud_metrics) + + # Modals (centered, large) — drawn last so they’re on top + now = time.time() + if self._modal_kind and now < self._modal_expire_ts: + if self._modal_kind == "recalib": + self._push_recalibration_modal( + img_annots, + self._modal_payload.get("values"), + self._modal_payload.get("angles"), + ) + elif self._modal_kind == "quality": + self._push_quality_bar_modal( + img_annots, + self._modal_payload.get("values"), + self._modal_payload.get("rotation"), + self._modal_payload.get("text", ""), + ) + elif self._modal_kind and now >= self._modal_expire_ts: + self._modal_kind = None + self._modal_expire_ts = 0.0 + self._modal_payload = {} + + self.out_annotations.send(img_annots) + + def process(self, frame: dai.ImgFrame): + # 0) drain depth/disparity input so HUD has up-to-date pixels + if self.in_depth is not None: + while True: + dmsg = self.in_depth.tryGet() + if dmsg is None: + break + try: + # store raw array (uint16 depth in mm, or disparity) + self._last_depth = dmsg.getFrame() + except Exception: + pass + + # 1) drain quality queue + if self._quality_q is not None: + while True: + msg = self._quality_q.tryGet() + if msg is None: + break + + info = getattr(msg, "info", None) + if info: + print(f"Dynamic calibration status: {info}") + + qd = getattr(msg, "qualityData", None) + if qd: + self.last_quality = msg + self._status_text = None + self._status_expire_ts = 0.0 + # Show quality modal (health bar) + self._modal_kind = "quality" + self._modal_payload = { + "values": getattr(qd, "depthErrorDifference", None), + "rotation": getattr(qd, "rotationChange", None), + "text": "", + } + self._modal_expire_ts = time.time() + 3.5 + else: + self._status_text = ( + "Data is missing — please load more images with 'l'." + ) + self._status_expire_ts = time.time() + 2.5 + + # 2) drain calibrationOutput queue + if self._calib_q is not None: + while True: + msg = self._calib_q.tryGet() + if msg is None: + break + cd = getattr(msg, "calibrationData", None) + if not cd: + continue + + # stop collecting when calibration finishes + self._collecting = False + + # store new calibration + diff + self.new_calibration = getattr(cd, "newCalibration", None) + self._last_calib_diff = getattr(cd, "calibrationDifference", None) + + # console messages + print("Successfully calibrated") + if self.new_calibration is not None: + print(f"New calibration: {self.new_calibration}") + + # auto-apply if enabled + if self.auto_apply_new and self.new_calibration is not None: + try: + self._send_cmd( + dai.DynamicCalibrationControl.Commands.ApplyCalibration( + self.new_calibration + ) + ) + self.old_calibration, self.calibration = ( + self.calibration, + self.new_calibration, + ) + self.new_calibration = None + except Exception as e: + print(f"Failed to apply new calibration: {e}") + # after applying, you may want to reset device-side buffers + self._send_cmd(dai.DynamicCalibrationControl.Commands.ResetData()) + + # pretty print the difference metrics (if available) + q = self._last_calib_diff + if q is not None: + rotmag = float( + np.sqrt( + q.rotationChange[0] ** 2 + + q.rotationChange[1] ** 2 + + q.rotationChange[2] ** 2 + ) + ) + print( + f"Rotation difference: || r_current - r_new || = {rotmag} deg" + ) + print(f"Mean Sampson error achievable = {q.sampsonErrorNew} px ") + print(f"Mean Sampson error current = {q.sampsonErrorCurrent} px") + d = getattr(q, "depthErrorDifference", None) + if d and len(d) >= 4: + print( + "Theoretical Depth Error Difference " + f"@1m:{d[0]:.2f}%, 2m:{d[1]:.2f}%, 5m:{d[2]:.2f}%, 10m:{d[3]:.2f}%" + ) + + # Show recalibration modal with values + Euler angles + self._modal_kind = "recalib" + self._modal_payload = { + "values": getattr(q, "depthErrorDifference", None), + "angles": getattr(q, "rotationChange", None), + } + self._modal_expire_ts = time.time() + 3.5 + + # 3) drain coverage (device -> host) + if self._coverage_q is not None: + while True: + msg = self._coverage_q.tryGet() + if msg is None: + break + + # coveragePerCellA: list/array of cell coverages in [0..1] or [0..100] + cvec = getattr(msg, "coveragePerCellA", None) + if cvec is not None: + try: + arr = np.asarray(cvec, dtype=np.float32).ravel() + if arr.size > 0: + self._coverage_vec = arr + # auto-detect units + mx = float(np.nanmax(arr)) + pct_from_cells = float( + np.nanmean(arr) * (100.0 if mx <= 1.01 else 1.0) + ) + self._coverage_pct = pct_from_cells + except Exception: + pass + + # optional: dataAcquired (0..1 or 0..100) + da = getattr(msg, "dataAcquired", None) + if da is not None: + try: + val = float(da) + self._data_acquired = val + if 0.0 <= val <= 1.0: + pct_from_da = val * 100.0 + else: + pct_from_da = val + # take the max so bar never goes backwards + self._coverage_pct = max(self._coverage_pct, pct_from_da) + except Exception: + pass + + # 4) normal HUD push + self._push_overlay(frame) + + # ---- public helpers ---- + def set_current_calibration(self, calib: Optional[dai.CalibrationHandler]): + self.calibration = calib + + def on_new_calibration(self, calib: dai.CalibrationHandler): + self.new_calibration = calib + + # ---- key handling ---- + def handle_key_press(self, key: int): + # dismiss modal on any key + if self._modal_kind is not None: + self._modal_kind = None + self._modal_expire_ts = 0.0 + self._modal_payload = {} + + # commands + if key == ord("q"): + self.wants_quit = True + return + + if key == ord("r"): + self._collecting_until = 0.0 + self._collecting = True + self._coverage_vec = None + self._coverage_pct = 0.0 + self._data_acquired = 0.0 + self._last_calib_diff = None + self.last_quality = None + self._send_cmd(dai.DynamicCalibrationControl.Commands.StartCalibration()) + return + + if key == ord("R"): + self._collecting = False + self._coverage_vec = None + self._coverage_pct = 0.0 + self._data_acquired = 0.0 + self._last_calib_diff = None + self.last_quality = None + self._send_cmd(dai.DynamicCalibrationControl.Commands.Calibrate(force=True)) + return + + if key == ord("l"): + self._collecting_until = time.time() + 2.0 # show bar only for 2s + self._collecting = False + self._coverage_vec = None + self._coverage_pct = 0.0 + self._data_acquired = 0.0 + self._last_calib_diff = None + self.last_quality = None + self._flash_status("Loading images… move the rig to collect frames.", 2.0) + self._send_cmd(dai.DynamicCalibrationControl.Commands.LoadImage()) + return + + if key == ord("n") and self.new_calibration: + self._send_cmd( + dai.DynamicCalibrationControl.Commands.ApplyCalibration( + self.new_calibration + ) + ) + self.old_calibration, self.calibration = ( + self.calibration, + self.new_calibration, + ) + self.new_calibration = None + self._flash_status("Applying NEW calibration…", 2.0) + return + + if key == ord("o") and self.old_calibration: + self._send_cmd( + dai.DynamicCalibrationControl.Commands.ApplyCalibration( + self.old_calibration + ) + ) + self.new_calibration, self.calibration, self.old_calibration = ( + self.calibration, + self.old_calibration, + None, + ) + self._flash_status("Reverting to PREVIOUS calibration…", 2.0) + return + + if key == ord("c"): + self._collecting = False + self._coverage_vec = None + self._coverage_pct = 0.0 + self._data_acquired = 0.0 + self._last_calib_diff = None + self.last_quality = None + self._send_cmd(dai.DynamicCalibrationControl.Commands.CalibrationQuality()) + return + + if key == ord("C"): + self._collecting = False + self._coverage_vec = None + self._coverage_pct = 0.0 + self._data_acquired = 0.0 + self._last_calib_diff = None + self.last_quality = None + self._send_cmd( + dai.DynamicCalibrationControl.Commands.CalibrationQuality(force=True) + ) + return + + if key == ord("h"): + self._show_help = not self._show_help + return + + # depth HUD controls + if key in (ord("g"), ord("G")): + self._hud_on = not self._hud_on + return + + if self._last_depth is not None: + step = max(1, int(0.01 * self._last_depth.shape[1])) # ~1% width per tap + cx, cy = ( + self._cursor_xy + if self._cursor_xy is not None + else (self._last_depth.shape[1] // 2, self._last_depth.shape[0] // 2) + ) + if key in (ord("a"), ord("A")): + cx -= step + if key in (ord("d"), ord("D")): + cx += step + if key in (ord("w"), ord("W")): + cy -= step + if key in (ord("s"), ord("S")): + cy += step + if key in (ord("z"), ord("Z")): + self._roi_half = max(1, self._roi_half - 1) + if key in (ord("x"), ord("X")): + self._roi_half += 1 + H, W = self._last_depth.shape[:2] + self._cursor_xy = (int(np.clip(cx, 0, W - 1)), int(np.clip(cy, 0, H - 1))) + + # --- FLASH to EEPROM --- + if key == ord("p"): + # Flash NEW (or current) calibration + if self._device is None: + self._flash_status("No device bound — cannot flash.", 2.0) + return + calib_to_flash = self.new_calibration or self.calibration + if calib_to_flash is None: + self._flash_status("No NEW/current calibration to flash.", 2.0) + return + try: + self._device.flashCalibration(calib_to_flash) + self._flash_status("Flashed NEW/current calibration to EEPROM.", 2.0) + except Exception as e: + self._flash_status(f"Flash failed: {e}", 2.5) + return + + if key == ord("k"): + # Flash PREVIOUS calibration + if self._device is None: + self._flash_status("No device bound — cannot flash.", 2.0) + return + if self.old_calibration is None: + self._flash_status("No PREVIOUS calibration to flash.", 2.0) + return + try: + self._device.flashCalibration(self.old_calibration) + self._flash_status("Flashed PREVIOUS calibration to EEPROM.", 2.0) + except Exception as e: + self._flash_status(f"Flash failed: {e}", 2.5) + return + + if key == ord("f"): + # Flash FACTORY calibration + if self._device is None: + self._flash_status("No device bound — cannot flash.", 2.0) + return + try: + factory = self._device.readFactoryCalibration() + self._device.flashCalibration(factory) + self._flash_status("Flashed FACTORY calibration to EEPROM.", 2.0) + except Exception as e: + self._flash_status(f"Flash failed: {e}", 2.5) + return diff --git a/dynamic-calibration/utils/helper_functions.py b/dynamic-calibration/utils/helper_functions.py new file mode 100644 index 000000000..c9d3f9bc7 --- /dev/null +++ b/dynamic-calibration/utils/helper_functions.py @@ -0,0 +1,586 @@ +import cv2 +import numpy as np + + +def draw_recalibration_message(image, values, angles): + width, height = image.shape[1], image.shape[0] + if values == []: + lines = [] + lines.append("Data is missing please load more images with 'l'.") + lines.append("Press any key to continue ...") + else: + # --- Determine message content --- + threshold = 0.075 # degrees + axis_names = ["Roll", "Pitch", "Yaw"] + over_threshold = [ + i for i, angle in enumerate(np.abs(angles)) if angle > threshold + ] + + lines = [] + lines.append("Recalibration complete") + + if over_threshold: + axes = ", ".join([axis_names[i] for i in over_threshold]) + lines.append(f"Significant change in rotation! {axes}") + lines.append("To permanently flash new calibration, press 's'!") + else: + lines.append("No significant change detected") + + lines.append( + f"Euler angles (deg): Roll={angles[0]:.2f}, Pitch={angles[1]:.2f}, Yaw={angles[2]:.2f}" + ) + lines.append( + f"Depth error @1m:{values[0]:.2f}%, 2m:{values[1]:.2f}%, 5m:{values[2]:.2f}%, 10m:{values[3]:.2f}%" + ) + lines.append("Press any key to continue ...") + + # --- Text layout parameters --- + font = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 1.2 + thickness = 2 + line_spacing = 25 + line_height = int( + cv2.getTextSize("Test", font, font_scale, thickness)[0][1] + line_spacing + ) + + # Calculate full box height and max width + text_sizes = [ + cv2.getTextSize(line, font, font_scale, thickness)[0] for line in lines + ] + box_width = max([size[0] for size in text_sizes]) + 40 + box_height = line_height * len(lines) + 20 + + # Box position (centered) + box_x = (width - box_width) // 2 + box_y = (height - box_height) // 2 + + # Draw semi-transparent background box + overlay = image.copy() + cv2.rectangle( + overlay, (box_x, box_y), (box_x + box_width, box_y + box_height), (0, 0, 0), -1 + ) + alpha = 0.5 + cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0, image) + + # --- Draw text lines --- + current_y = box_y + 30 + for i, line in enumerate(lines): + color = (255, 255, 255) + if "Significant change" in line: + color = (0, 0, 255) + elif "Recalibration complete" in line: + color = (0, 255, 0) + + text_size = cv2.getTextSize(line, font, font_scale, thickness)[0] + text_x = box_x + (box_width - text_size[0]) // 2 + cv2.putText( + image, line, (text_x, current_y), font, font_scale, color, thickness + ) + current_y += line_height + + return image + + +def display_text(image, text): + font_scale = 1.0 + width, height = image.shape[1], image.shape[0] + text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, 5)[0] + text_width, text_height = text_size + + # Define box dimensions + box_padding = 10 + box_width = text_width + box_padding * 2 + box_height = text_height + box_padding * 2 + box_x = (width - box_width) // 2 + box_y = (height - box_height) // 2 + + # Draw semi-transparent background box + overlay = image.copy() + box_start = (box_x, box_y - 50) + box_end = (box_x + box_width, box_y + box_height - 50) + alpha = (0, 0, 0, 128)[3] / 255.0 + cv2.rectangle(overlay, box_start, box_end, (0, 0, 0, 128)[:3], -1) + cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0, image) + + # Draw text at the center of the box + text_x = box_x + (box_width - text_width) // 2 + text_y = box_y + (box_height + text_height) // 2 + font_scale = 1.0 + cv2.putText( + image, + text, + (text_x, text_y - 50), + cv2.FONT_HERSHEY_SIMPLEX, + font_scale, + (255, 255, 255), + 2, + ) + + +def draw_health_bar(image, values, rotation, display_text=""): + if rotation == []: + font_scale = 1.0 + width, height = image.shape[1], image.shape[0] + text = "Data is missing please load more images with 'l'." + text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, 5)[0] + text_width, text_height = text_size + + # Define box dimensions + box_padding = 10 + box_width = text_width + box_padding * 2 + box_height = text_height + box_padding * 2 + box_x = (width - box_width) // 2 + box_y = (height - box_height) // 2 + + # Draw semi-transparent background box + overlay = image.copy() + box_start = (box_x, box_y - 50) + box_end = (box_x + box_width, box_y + box_height - 50) + alpha = (0, 0, 0, 128)[3] / 255.0 + cv2.rectangle(overlay, box_start, box_end, (0, 0, 0, 128)[:3], -1) + cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0, image) + + # Draw text at the center of the box + text_x = box_x + (box_width - text_width) // 2 + text_y = box_y + (box_height + text_height) // 2 + font_scale = 1.0 + cv2.putText( + image, + text, + (text_x, text_y - 50), + cv2.FONT_HERSHEY_SIMPLEX, + font_scale, + (255, 255, 255), + 2, + ) + + else: # Normalize the value for display (clamp between 0 and 40) + normalized_value = min(max(rotation), 0.2) + + # Bar parameters + width, height = image.shape[1], image.shape[0] + bar_width = int(width * 0.8) + bar_height = 50 + bar_x = (width - bar_width) // 2 + bar_y = (height - bar_height) // 2 # Center the bar vertically + + # Define labels and colors + labels = ["GOOD", "COULD BE IMPROVED", "NEEDS RECALIBRATION"] + colors = [(0, 255, 0), (0, 255, 255), (0, 0, 255)] # Green, Yellow, Red + + # Divide the bar into 3 equal sections + section_width = bar_width // 3 + + for i in range(3): + start_x = bar_x + i * section_width + end_x = start_x + section_width + color = colors[i] + + # Draw the colored section + cv2.rectangle( + image, (start_x, bar_y), (end_x, bar_y + bar_height), color, -1 + ) + + # Add the label inside the section + font_scale = 0.8 + thickness = 3 + text_size = cv2.getTextSize( + labels[i], cv2.FONT_HERSHEY_SIMPLEX, font_scale, thickness + )[0] + text_x = start_x + (section_width - text_size[0]) // 2 + text_y = bar_y + (bar_height + text_size[1]) // 2 + cv2.putText( + image, + labels[i], + (text_x, text_y), + cv2.FONT_HERSHEY_SIMPLEX, + font_scale, + (0, 0, 0), + thickness, + ) + + # Map the value to the position within the bar + if normalized_value <= 0.07: + pointer_x = int(bar_x + (normalized_value / 0.07) * section_width) + elif normalized_value <= 0.15: + pointer_x = int( + bar_x + + section_width + + ((normalized_value - 0.07) / 0.07) * section_width + ) + else: + pointer_x = int( + bar_x + + 2 * section_width + + ((normalized_value - 0.15) / 0.15) * section_width + ) + pointer_y_top = bar_y - 10 + pointer_y_bottom = bar_y + bar_height + 10 + cv2.line( + image, + (pointer_x, pointer_y_top), + (pointer_x, pointer_y_bottom), + (0, 0, 0), + 2, + ) + + # Add the numerical value above the pointer + text = f"Depth error changes at 1m->{values[0]:.2f}%, 2m->{values[1]:.2f}%, 5m->{values[2]:.2f}%, 10m->{values[3]:.2f}%" + font_scale = 0.9 + text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, 2)[0] + text_width, text_height = text_size + + # Define box dimensions + box_padding = 10 + box_width = text_width + box_padding * 2 + box_height = text_height + box_padding * 2 + box_x = (width - box_width) // 2 + box_y = (height - box_height) // 2 + + # Draw semi-transparent background box + overlay = image.copy() + box_start = (box_x, box_y + -100) + box_end = (box_x + box_width, box_y + box_height - 100) + alpha = (0, 0, 0, 128)[3] / 255.0 + cv2.rectangle(overlay, box_start, box_end, (0, 0, 0, 128)[:3], -1) + cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0, image) + + # Draw text at the center of the box + text_x = box_x + (box_width - text_width) // 2 + text_y = box_y + (box_height + text_height) // 2 + cv2.putText( + image, + text, + (text_x, text_y - 100), + cv2.FONT_HERSHEY_SIMPLEX, + font_scale, + (255, 255, 255), + 3, + ) + font_scale = 1.0 + # Draw the pointer + if display_text != "": + text = display_text + text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, 5)[ + 0 + ] + text_width, text_height = text_size + + # Define box dimensions + box_padding = 10 + box_width = text_width + box_padding * 2 + box_height = text_height + box_padding * 2 + box_x = (width - box_width) // 2 + box_y = (height - box_height) // 2 + + # Draw semi-transparent background box + overlay = image.copy() + box_start = (box_x, box_y - 50) + box_end = (box_x + box_width, box_y + box_height - 50) + alpha = (0, 0, 0, 128)[3] / 255.0 + cv2.rectangle(overlay, box_start, box_end, (0, 0, 0, 128)[:3], -1) + cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0, image) + + # Draw text at the center of the box + text_x = box_x + (box_width - text_width) // 2 + text_y = box_y + (box_height + text_height) // 2 + font_scale = 1.0 + cv2.putText( + image, + text, + (text_x, text_y - 50), + cv2.FONT_HERSHEY_SIMPLEX, + font_scale, + (255, 255, 255), + 2, + ) + + text = "Press any key to continue ..." + text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, 5)[0] + text_width, text_height = text_size + + # Define box dimensions + box_padding = 10 + box_width = text_width + box_padding * 2 + box_height = text_height + box_padding * 2 + box_x = (width - box_width) // 2 + box_y = (height - box_height) // 2 + + # Draw semi-transparent background box + overlay = image.copy() + box_start = (box_x, box_y + 50) + box_end = (box_x + box_width, box_y + box_height + 50) + alpha = (0, 0, 0, 128)[3] / 255.0 + cv2.rectangle(overlay, box_start, box_end, (0, 0, 0, 128)[:3], -1) + cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0, image) + + # Draw text at the center of the box + text_x = box_x + (box_width - text_width) // 2 + text_y = box_y + (box_height + text_height) // 2 + font_scale = 1.0 + cv2.putText( + image, + text, + (text_x, text_y + 50), + cv2.FONT_HERSHEY_SIMPLEX, + font_scale, + (255, 255, 255), + 2, + ) + return image + + +def draw_progress_bar_with_percentage( + image, + progress, + coverage_check, + bar_color=(0, 255, 0), + bg_color=(50, 50, 50), + thickness=50, + bar_length_ratio=0.6, + font_scale=1, + font_color=(255, 255, 255), + text_bg_color=(0, 0, 0, 128), +): + progress = max(0, min(1, progress / 100)) + img_height, img_width = image.shape[:2] + + # Calculate bar dimensions + bar_length = int(img_width * bar_length_ratio) + bar_x = (img_width - bar_length) // 2 + bar_y = (img_height - thickness) // 2 + + # Draw background bar + start_point = (bar_x, bar_y) + end_point = (bar_x + bar_length, bar_y + thickness) + cv2.rectangle(image, start_point, end_point, bg_color, -1) + + # Draw filled bar + filled_length = int(bar_length * progress) + filled_end_point = (bar_x + filled_length, bar_y + thickness) + cv2.rectangle(image, start_point, filled_end_point, bar_color, -1) + + # Draw percentage text + percentage_text = f"{int(progress * 100)}%" + text_size = cv2.getTextSize( + percentage_text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, 2 + )[0] + text_x = bar_x + (bar_length - text_size[0]) // 2 + text_y = bar_y + (thickness + text_size[1]) // 2 + + # Add semi-transparent background for percentage text + overlay = image.copy() + text_bg_start = (text_x - 10, text_y - text_size[1] - 10) + text_bg_end = (text_x + text_size[0] + 10, text_y + 10) + cv2.rectangle(overlay, text_bg_start, text_bg_end, text_bg_color[:3], -1) + alpha = text_bg_color[3] / 255.0 + cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0, image) + + # Draw the text + cv2.putText( + image, + percentage_text, + (text_x, text_y), + cv2.FONT_HERSHEY_SIMPLEX, + font_scale, + font_color, + 2, + ) + + # Draw "Coverage check" text below the progress bar + if not coverage_check: + coverage_text = "Collecting data ..." + else: + coverage_text = "Waiting for enough data ..." + coverage_text_size = cv2.getTextSize( + coverage_text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, 2 + )[0] + coverage_text_x = (img_width - coverage_text_size[0]) // 2 + coverage_text_y = bar_y + thickness + 20 + coverage_text_size[1] + + # Add semi-transparent background for coverage text + overlay = image.copy() + coverage_bg_start = ( + coverage_text_x - 10, + coverage_text_y - coverage_text_size[1] - 10, + ) + coverage_bg_end = ( + coverage_text_x + coverage_text_size[0] + 10, + coverage_text_y + 10, + ) + cv2.rectangle(overlay, coverage_bg_start, coverage_bg_end, text_bg_color[:3], -1) + cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0, image) + + # Draw the coverage check text + cv2.putText( + image, + coverage_text, + (coverage_text_x, coverage_text_y), + cv2.FONT_HERSHEY_SIMPLEX, + font_scale, + font_color, + 2, + ) + + return image + + +def overlay_coverage_on_gray( + gray_image: np.ndarray, coverage_per_cell: list[list[float]], dataAquired: float +) -> np.ndarray: + """ + Overlay green coverage map on a grayscale image. + + Args: + gray_image: Grayscale input image (2D NumPy array). + coverage_per_cell: 2D list of float values in range [0.0, 1.0], defining coverage per cell. + + Returns: + Color image with green overlay per cell, proportional to coverage. + """ + # Convert grayscale to BGR + if len(gray_image.shape) != 3: + color_image = cv2.cvtColor(gray_image, cv2.COLOR_GRAY2BGR) + else: + color_image = gray_image + + rows = len(coverage_per_cell) + cols = len(coverage_per_cell[0]) + cell_width = color_image.shape[1] // cols + cell_height = color_image.shape[0] // rows + + for y in range(rows): + for x in range(cols): + coverage = coverage_per_cell[y][x] + if coverage <= 0.0: + continue + + alpha = 0.5 * min(1.0, max(0.0, coverage)) # Clamp between 0 and 1 + green = (0, 255, 0) # BGR + + x0 = x * cell_width + y0 = y * cell_height + x1 = x0 + cell_width + y1 = y0 + cell_height + + # Overlay green box with alpha blending + overlay = color_image.copy() + cv2.rectangle(overlay, (x0, y0), (x1, y1), green, thickness=cv2.FILLED) + color_image[y0:y1, x0:x1] = cv2.addWeighted( + overlay[y0:y1, x0:x1], alpha, color_image[y0:y1, x0:x1], 1 - alpha, 0 + ) + if dataAquired != 100: + check_coverage = False + else: + check_coverage = True + draw_progress_bar_with_percentage(color_image, dataAquired, check_coverage) + return color_image + + +def print_final_calibration_results(calib_quality, state: str): + rotation_change = getattr(calib_quality, "rotationChange", []) + depth_accuracy = getattr(calib_quality, "depthErrorDifference", []) + + print( + f"\n<<< -----------------------------|Final Results -- {state}|------------------------------------>>>" + ) + + # Handle rotation change + if rotation_change: + print( + "Rotation change[°]:", + " ".join(f"{float(val):.3f}" for val in rotation_change), + ) + else: + print("Rotation change[°]: N/A") + + # Handle depth accuracy + if depth_accuracy and len(depth_accuracy) >= 4: + print("Improvements if new calibration is applied (as float):") + print( + f"1m->{depth_accuracy[0]:.2f}%, \n2m->{depth_accuracy[1]:.2f}%, \n5m->{depth_accuracy[2]:.2f}%, \n10m->{depth_accuracy[3]:.2f}%" + ) + else: + print("Depth accuracy data unavailable or incomplete.") + + # Instructions + if state == "Recalibration": + print( + "New calibration has been applied, to apply the old one, press 'o'. To flash the new calibration, press 's'." + ) + else: + print("To continue with recalibration, press 'r'.") + + print( + "<<< -----------------------------|Finished|------------------------------------>>>\n" + ) + + +def draw_key_commands( + image, font_scale=1.1, color=(255, 255, 255), thickness=2, line_spacing=50 +): + """Draws key command info centered on the image with a semi-transparent full-frame background.""" + commands = [ + "DynamicCalibration mode, Key commands:", + "[c] Calibration quality check", + "[r] Recalibrate", + "[a] Force calibration check", + "[d] Force recalibrate", + "[l] Load image", + "[n] Apply new calibration", + "[o] Apply old calibration", + "[s] Flash new calibration", + "[k] Flash old calibration", + "[f] Flash factory calibration", + "[x] -> Save current frames.", + "[+/-] -> Adjust ROi size", + "[q] Quit", + ] + + height, width = image.shape[:2] + overlay = image.copy() + + # Draw semi-transparent black background over the entire frame + cv2.rectangle(overlay, (0, 0), (width, height), (0, 0, 0), -1) + alpha = 0.5 + cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0, image) + + # Calculate vertical centering + text_block_height = line_spacing * len(commands) + y_start = (height - text_block_height) // 2 + + for i, line in enumerate(commands): + text_size = cv2.getTextSize( + line, cv2.FONT_HERSHEY_SIMPLEX, font_scale, thickness + )[0] + x = (width - text_size[0]) // 2 + y = y_start + i * line_spacing + cv2.putText( + image, + line, + (x, y), + cv2.FONT_HERSHEY_SIMPLEX, + font_scale, + color, + thickness, + lineType=cv2.LINE_AA, + ) + + +def update_master_frame( + leftFrame, rightFrame, disp_vis, fourthFrame, width=1280, height=800 +): + master_frame = np.zeros((height, width, 3), dtype=np.uint8) + + # Place subframes into master + master_frame[0:400, 0:640] = cv2.resize( + leftFrame, (width // 2, height // 2) + ) # Top-left + master_frame[0:400, 640:1280] = cv2.resize( + rightFrame, (width // 2, height // 2) + ) # Top-right + master_frame[400:800, 0:640] = cv2.resize( + disp_vis, (width // 2, height // 2) + ) # Bottom-left + master_frame[400:800, 640:1280] = cv2.resize(fourthFrame, (width // 2, height // 2)) + return master_frame