From 8aed6b6d92fd17f6cc77b51b7686522138902eae Mon Sep 17 00:00:00 2001 From: Sahit Chintalapudi Date: Fri, 17 Oct 2025 13:43:20 -0400 Subject: [PATCH 1/6] add a wiping skill --- exec_plan.py | 42 +++++++++---- skills/wipe.py | 157 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 187 insertions(+), 12 deletions(-) create mode 100644 skills/wipe.py diff --git a/exec_plan.py b/exec_plan.py index 25ec313..d00ae5f 100644 --- a/exec_plan.py +++ b/exec_plan.py @@ -23,6 +23,7 @@ move_hand_to_relative_pose, open_gripper, ) +from skills.wipe import wipe_multiple_strokes from skills.spot_navigation import navigate_to_absolute_pose from spot_utils.perception.spot_cameras import capture_images from spot_utils.spot_localization import SpotLocalizer @@ -54,6 +55,15 @@ SPOT_ROOM_POSE: Dict[str, float] = dict() +def np_pose_to_SE3(X_RobEE: NDArray) -> math_helpers.SE3Pose: + return math_helpers.SE3Pose( + X_RobEE[0], + X_RobEE[1], + X_RobEE[2], + rot=math_helpers.Quat(X_RobEE[6], X_RobEE[3], X_RobEE[4], X_RobEE[5]), + ) + + def init(hostname: str, map_name: str, endpoint_url: Optional[str]) -> None: """Initialize the robot and the localizer.""" global LOCALIZER @@ -121,12 +131,7 @@ def grasp(text_prompt: Optional[str]) -> None: def grasp_at_pose(X_RobEE: NDArray) -> None: """Grasp an object at a specified pose relative to the robot.""" open_gripper(ROBOT) - pose = math_helpers.SE3Pose( - x=X_RobEE[0], - y=X_RobEE[1], - z=X_RobEE[2], - rot=math_helpers.Quat(X_RobEE[6], X_RobEE[3], X_RobEE[4], X_RobEE[5]), - ) + pose = np_pose_to_SE3(X_RobEE) move_hand_to_relative_pose(ROBOT, pose.mult(grasp_offset)) close_gripper(ROBOT) move_hand_to_relative_pose(ROBOT, DEFAULT_HAND_LOOK_FLOOR_POSE) @@ -134,17 +139,30 @@ def grasp_at_pose(X_RobEE: NDArray) -> None: def place_at_pose(X_RobEE: NDArray) -> None: """Place an object at a specified pose relative to the robot.""" - pose = math_helpers.SE3Pose( - x=X_RobEE[0], - y=X_RobEE[1], - z=X_RobEE[2], - rot=math_helpers.Quat(X_RobEE[6], X_RobEE[3], X_RobEE[4], X_RobEE[5]), - ) + pose = np_pose_to_SE3(X_RobEE) move_hand_to_relative_pose(ROBOT, pose.mult(grasp_offset)) open_gripper(ROBOT) move_hand_to_relative_pose(ROBOT, DEFAULT_HAND_LOOK_FLOOR_POSE) +def vertical_wipe( + X_RobEE_start: NDArray, stroke_dx: float, y_delta: float, num_strokes: int +) -> None: + """Wipes a surface at a given pose with known height and width..""" + start_pose = np_pose_to_SE3(X_RobEE_start) + wipe_multiple_strokes( + ROBOT, + start_pose, + start_pose, + stroke_dx=stroke_dx, + stroke_dy=0, + delta_x_y_between_strokes=(0, y_delta), + num_strokes=num_strokes, + duration_per_stroke=3.0, + num_attempts_per_stroke=1, + ) + + if __name__ == "__main__": # running this script standalone initializes a bosdyn robot and localizer. # It then executes the list of commands provided in the plan file diff --git a/skills/wipe.py b/skills/wipe.py new file mode 100644 index 0000000..74095e5 --- /dev/null +++ b/skills/wipe.py @@ -0,0 +1,157 @@ +"""Interface for spot sweeping skill.""" + +import argparse +import time +from typing import Tuple + +import numpy as np +from bosdyn.client import math_helpers +from bosdyn.client.sdk import Robot + +from skills.spot_hand_move import ( + close_gripper, + move_hand_to_relative_pose, + move_hand_to_relative_pose_with_velocity, + open_gripper, +) + + +def wipe_one_stroke( + robot: Robot, + wipe_start_pose: math_helpers.SE3Pose, + move_dx: float, + move_dy: float, + duration: float, +) -> None: + """Wipe a table surface in the xy plane. + + The robot starts at a start pose, and then moves forward and back by + dx and dy. + """ + # Move first in the yz direction (perpendicular to robot body) to avoid + # knocking the target object over. + move_hand_to_relative_pose(robot, wipe_start_pose) + first_move_pose = math_helpers.SE3Pose( + x=wipe_start_pose.x + move_dx, # sensible default + y=wipe_start_pose.y + move_dy, + z=wipe_start_pose.z, + rot=wipe_start_pose.rot, + ) + move_hand_to_relative_pose_with_velocity( + robot, wipe_start_pose, first_move_pose, duration + ) + time.sleep(0.1) + # Move back to the start pose. + move_hand_to_relative_pose_with_velocity( + robot, first_move_pose, wipe_start_pose, duration + ) + + +def wipe_multiple_strokes( + robot: Robot, + wipe_start_pose: math_helpers.SE3Pose, + end_look_pose: math_helpers.SE3Pose, + stroke_dx: float, + stroke_dy: float, + delta_x_y_between_strokes: Tuple[float, float], + num_strokes: int, + duration_per_stroke: float, + num_attempts_per_stroke: int, +) -> None: + """Wipe a table surface in the xy plane. + + The robot starts at a start pose, and then moves forward and back by + dx and dy. + """ + curr_stroke_start_pose = wipe_start_pose + for i in range(num_strokes): + for i in range(num_attempts_per_stroke): + move_hand_to_relative_pose(robot, curr_stroke_start_pose) + first_move_pose = math_helpers.SE3Pose( + x=curr_stroke_start_pose.x + stroke_dx, + y=curr_stroke_start_pose.y + stroke_dy, + z=curr_stroke_start_pose.z, + rot=curr_stroke_start_pose.rot, + ) + move_hand_to_relative_pose_with_velocity( + robot, curr_stroke_start_pose, first_move_pose, duration_per_stroke + ) + time.sleep(0.1) + # Move back to the start pose. + move_hand_to_relative_pose_with_velocity( + robot, first_move_pose, curr_stroke_start_pose, duration_per_stroke + ) + # Move to the next stroke position. + curr_stroke_start_pose = math_helpers.SE3Pose( + x=curr_stroke_start_pose.x + delta_x_y_between_strokes[0], + y=curr_stroke_start_pose.y + delta_x_y_between_strokes[1], + z=curr_stroke_start_pose.z, + rot=curr_stroke_start_pose.rot, + ) + # Move to the end look pose. + move_hand_to_relative_pose(robot, end_look_pose) + + +if __name__ == "__main__": + # Run this file alone to test manually. + # Make sure to pass in --spot_robot_ip. + + # the robot opens its gripper and pauses + # until a brush is put in the gripper, with the bristles facing down and + # forward. The robot should then brush to the right. + + # pylint: disable=ungrouped-imports + from bosdyn.client import create_standard_sdk + from bosdyn.client.lease import LeaseClient + from bosdyn.client.util import authenticate + + from spot_utils.utils import verify_estop + + def _run_manual_test() -> None: + # Put inside a function to avoid variable scoping issues. + parser = argparse.ArgumentParser(description="Parse the robot's hostname.") + parser.add_argument( + "--hostname", + type=str, + required=True, + help="The robot's hostname/ip-address (e.g. 192.168.80.3)", + ) + args = parser.parse_args() + + # Get constants. + hostname = args.hostname + + sdk = create_standard_sdk("WipeSkillTestClient") + robot = sdk.create_robot(hostname) + authenticate(robot) + verify_estop(robot) + lease_client = robot.ensure_client(LeaseClient.default_service_name) + lease_client.take() + robot.time_sync.wait_for_sync() + + # Move the hand to the side. + hand_side_pose = math_helpers.SE3Pose( + x=0.80, y=0.0, z=0.25, rot=math_helpers.Quat.from_yaw(-np.pi / 2) + ) + move_hand_to_relative_pose(robot, hand_side_pose) + # Ask for the eraser. + open_gripper(robot) + msg = "Put the brush in the robot's gripper, then press enter" + input(msg) + close_gripper(robot) + + # NOTE: these parameters hardcoded for a particular child_play_table + # object njk is experimenting with. Please swap out depending on the + # actual object you have + start_pose = math_helpers.SE3Pose( + x=0.85, y=-0.2, z=-0.08, rot=math_helpers.Quat.from_pitch(np.pi / 2) + ) + end_pose = math_helpers.SE3Pose( + x=0.65, y=0.0, z=0.4, rot=math_helpers.Quat.from_pitch(np.pi / 2.5) + ) + # Execute the sweep. + wipe_multiple_strokes( + robot, start_pose, end_pose, 0.0, 0.4, (0.05, 0.0), 5, 1.0 + ) + + _run_manual_test() From 16b9e5966e221ae2c507cd1765af5d28fa1d659d Mon Sep 17 00:00:00 2001 From: Aditya Agarwal Date: Sat, 1 Nov 2025 17:13:34 -0400 Subject: [PATCH 2/6] added wipe skill parameterized by the bounding box coordinates of the detected spill --- skills/wipe.py | 2 +- skills/wipe_online.py | 676 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 677 insertions(+), 1 deletion(-) create mode 100644 skills/wipe_online.py diff --git a/skills/wipe.py b/skills/wipe.py index 74095e5..873ff86 100644 --- a/skills/wipe.py +++ b/skills/wipe.py @@ -151,7 +151,7 @@ def _run_manual_test() -> None: ) # Execute the sweep. wipe_multiple_strokes( - robot, start_pose, end_pose, 0.0, 0.4, (0.05, 0.0), 5, 1.0 + robot, start_pose, end_pose, 0.0, 0.4, (0.05, 0.0), 5, 1.0, 1 ) _run_manual_test() diff --git a/skills/wipe_online.py b/skills/wipe_online.py new file mode 100644 index 0000000..1e6afdf --- /dev/null +++ b/skills/wipe_online.py @@ -0,0 +1,676 @@ +import argparse +from typing import Optional +import json + +import numpy as np +from bosdyn.client import create_standard_sdk, math_helpers +from bosdyn.client.frame_helpers import ( + BODY_FRAME_NAME, + VISION_FRAME_NAME, + get_a_tform_b, +) +from bosdyn.client.lease import LeaseClient, LeaseKeepAlive +from bosdyn.client.sdk import Robot +from bosdyn.client.util import authenticate + +from spot_utils.perception.spot_cameras import capture_images +from spot_utils.spot_localization import SpotLocalizer +from spot_utils.utils import verify_estop, get_graph_nav_dir +from spot_utils.pretrained_model_interface import GoogleGeminiVLM +from skills.spot_hand_move import ( + move_hand_to_relative_pose, + move_hand_to_relative_pose_with_velocity, + open_gripper, + close_gripper, + stow_arm, +) +import rerun as rr +import cv2 +import os +from datetime import datetime +from PIL import Image +import open3d as o3d + +def init_robot(hostname: str, map_name: str) -> tuple[Robot, LeaseClient, LeaseKeepAlive, SpotLocalizer]: + sdk = create_standard_sdk("WipeOnlineClient") + robot = sdk.create_robot(hostname) + authenticate(robot) + verify_estop(robot) + lease_client = robot.ensure_client(LeaseClient.default_service_name) + lease_client.take() + lease_keepalive = LeaseKeepAlive( + lease_client, must_acquire=True, return_at_exit=True + ) + robot.time_sync.wait_for_sync() + + # Initialize localizer + path = get_graph_nav_dir(map_name) + localizer = SpotLocalizer(robot, path, lease_client, lease_keepalive) + localizer.localize() + print("[INFO] Localization successful.") + + return robot, lease_client, lease_keepalive, localizer + +DEFAULT_HAND_LOOK_FLOOR_POSE = math_helpers.SE3Pose( + x=0.80, y=0.0, z=0.25, rot=math_helpers.Quat.from_pitch(np.pi / 3) +) + +DEFAULT_HAND_LOOK_STRAIGHT_DOWN_POSE = math_helpers.SE3Pose( + x=0.80, y=0.0, z=0.25, rot=math_helpers.Quat.from_pitch(np.pi / 2) +) + +direction_to_pose = { + "DOWN": DEFAULT_HAND_LOOK_STRAIGHT_DOWN_POSE, + "AHEAD": DEFAULT_HAND_LOOK_FLOOR_POSE, +} + + +def visualize_bbox_normalized(image_path, bbox_norm, color=(0, 255, 0), thickness=2): + """ + img: HxWx3 uint8 (BGR) + bbox_norm: [ymin, xmin, ymax, xmax] with each in [0, 1000] + """ + img = cv2.imread(image_path) + H, W = img.shape[:2] + ymin, xmin, ymax, xmax = map(float, bbox_norm) + + # Scale normalized [0,1000] → pixels + x1 = int(np.clip(xmin * W / 1000.0, 0, W - 1)) + y1 = int(np.clip(ymin * H / 1000.0, 0, H - 1)) + x2 = int(np.clip(xmax * W / 1000.0, 0, W - 1)) + y2 = int(np.clip(ymax * H / 1000.0, 0, H - 1)) + + # Ensure non-degenerate box + if x2 <= x1: x2 = min(x1 + 1, W - 1) + if y2 <= y1: y2 = min(y1 + 1, H - 1) + + # Rectangle + img_out = img.copy() + cv2.rectangle(img_out, (x1, y1), (x2, y2), color, thickness) + + output_path = image_path.replace(".png", "_annotated.png").replace(".jpg", "_annotated.jpg") + cv2.imwrite(output_path, img_out) + + return img_out + +def draw_bounding_box(image_path, bbox_pixels, color=(0, 255, 0), thickness=2): + """ + Draw a bounding box using pixel coordinates directly (no normalization). + + Args: + image_path (str): Path to the image file. + bbox_pixels (list|tuple): [ymin, xmin, ymax, xmax] in pixel units. + color (tuple): BGR color for the rectangle. + thickness (int): Line thickness. + + Returns: + The annotated image (numpy array, BGR). + """ + img = cv2.imread(image_path) + if img is None: + raise FileNotFoundError(f"Could not read image from {image_path}") + + H, W = img.shape[:2] + ymin, xmin, ymax, xmax = map(int, bbox_pixels) + + # Clamp to image bounds + x1 = max(0, min(xmin, W - 1)) + y1 = max(0, min(ymin, H - 1)) + x2 = max(0, min(xmax, W - 1)) + y2 = max(0, min(ymax, H - 1)) + + # Ensure non-degenerate box + if x2 <= x1: + x2 = min(x1 + 1, W - 1) + if y2 <= y1: + y2 = min(y1 + 1, H - 1) + + img_out = img.copy() + cv2.rectangle(img_out, (x1, y1), (x2, y2), color, thickness) + + output_path = image_path.replace(".png", "_annotated.png").replace(".jpg", "_annotated.jpg") + cv2.imwrite(output_path, img_out) + + # return img_out + return output_path + +def move_hand_to_bbox_bottom_right( + robot: Robot, + rgbd, + bbox_pixels: list[int], + z_clearance_m: float = 0.01, +): + """ + Move the hand to the 3D point corresponding to the bbox bottom-right pixel. + bbox_pixels: [ymin, xmin, ymax, xmax] in pixel units. + """ + ymin, xmin, ymax, xmax = bbox_pixels + u, v = int(xmax), int(ymax) # bottom-right pixel + + # Depth to meters + depth = rgbd.depth + depth_m = depth.astype(np.float32) / 1000.0 if depth.dtype == np.uint16 else depth.astype(np.float32) + + z = float(depth_m[v, u]) if 0 <= v < depth_m.shape[0] and 0 <= u < depth_m.shape[1] else 0.0 + if not np.isfinite(z) or z <= 0: + win = 3 + v0, v1 = max(0, v - win), min(depth_m.shape[0], v + win + 1) + u0, u1 = max(0, u - win), min(depth_m.shape[1], u + win + 1) + patch = depth_m[v0:v1, u0:u1] + valid = patch[np.isfinite(patch) & (patch > 0)] + if valid.size == 0: + raise RuntimeError("No valid depth at or near bbox bottom-right.") + z = float(np.median(valid)) + + cam = rgbd.camera_model.intrinsics + fx, fy = cam.focal_length.x, cam.focal_length.y + cx, cy = cam.principal_point.x, cam.principal_point.y + + x_cam = (u - cx) / fx * z + y_cam = (v - cy) / fy * z + p_cam_h = np.array([x_cam, y_cam, z, 1.0], dtype=np.float64) + + T_vision_cam = get_a_tform_b( + rgbd.transforms_snapshot, VISION_FRAME_NAME, rgbd.frame_name_image_sensor + ).to_matrix() + T_body_vision = get_a_tform_b( + rgbd.transforms_snapshot, BODY_FRAME_NAME, VISION_FRAME_NAME + ).to_matrix() + + p_body = (T_body_vision @ (T_vision_cam @ p_cam_h))[:3] + + target_pose = math_helpers.SE3Pose( + x=float(p_body[0]), + y=float(p_body[1]), + z=float(p_body[2] + z_clearance_m), + rot=math_helpers.Quat.from_pitch(np.pi / 2), + ) + + move_hand_to_relative_pose(robot, target_pose) + +def wipe_one_stroke( + robot: Robot, + wipe_start_pose: math_helpers.SE3Pose, + move_dx: float, + move_dy: float, + duration: float, +): + """ + Execute a single wipe stroke starting at wipe_start_pose and moving by + (move_dx, move_dy) in the BODY frame, then returning to the start. + """ + move_hand_to_relative_pose(robot, wipe_start_pose) + first_move_pose = math_helpers.SE3Pose( + x=wipe_start_pose.x + move_dx, + y=wipe_start_pose.y + move_dy, + z=wipe_start_pose.z, + rot=wipe_start_pose.rot, + ) + move_hand_to_relative_pose_with_velocity( + robot, wipe_start_pose, first_move_pose, duration + ) + # Return to start pose + move_hand_to_relative_pose_with_velocity( + robot, first_move_pose, wipe_start_pose, duration + ) + +def wipe_multiple_strokes( + robot: Robot, + wipe_start_pose: math_helpers.SE3Pose, + end_look_pose: math_helpers.SE3Pose, + stroke_dx: float, + stroke_dy: float, + delta_x_y_between_strokes: tuple[float, float], + num_strokes: int, + duration_per_stroke: float, + num_attempts_per_stroke: int, +): + """ + Execute multiple wipe strokes. After each stroke (and attempts) the start pose + is shifted by delta_x_y_between_strokes in BODY frame. + """ + curr = wipe_start_pose + for _ in range(num_strokes): + for _ in range(num_attempts_per_stroke): + move_hand_to_relative_pose(robot, curr) + first_move_pose = math_helpers.SE3Pose( + x=curr.x + stroke_dx, + y=curr.y + stroke_dy, + z=curr.z, + rot=curr.rot, + ) + move_hand_to_relative_pose_with_velocity( + robot, curr, first_move_pose, duration_per_stroke + ) + # Return to start of this stroke + move_hand_to_relative_pose_with_velocity( + robot, first_move_pose, curr, duration_per_stroke + ) + # Shift to next stroke start + curr = math_helpers.SE3Pose( + x=curr.x + delta_x_y_between_strokes[0], + y=curr.y + delta_x_y_between_strokes[1], + z=curr.z, + rot=curr.rot, + ) + # End look pose + move_hand_to_relative_pose(robot, end_look_pose) + +def _pixel_to_body_xyz(u: int, v: int, rgbd) -> np.ndarray: + """Back-project a pixel (u, v) to BODY frame using rgbd intrinsics and transforms.""" + depth = rgbd.depth + depth_m = depth.astype(np.float32) / 1000.0 if depth.dtype == np.uint16 else depth.astype(np.float32) + if v < 0 or v >= depth_m.shape[0] or u < 0 or u >= depth_m.shape[1]: + raise ValueError("Pixel out of bounds") + z = float(depth_m[v, u]) + if not np.isfinite(z) or z <= 0: + # Small neighborhood fallback + win = 3 + v0, v1 = max(0, v - win), min(depth_m.shape[0], v + win + 1) + u0, u1 = max(0, u - win), min(depth_m.shape[1], u + win + 1) + patch = depth_m[v0:v1, u0:u1] + vals = patch[np.isfinite(patch) & (patch > 0)] + if vals.size == 0: + raise RuntimeError("No valid depth near pixel") + z = float(np.median(vals)) + cam = rgbd.camera_model.intrinsics + fx, fy = cam.focal_length.x, cam.focal_length.y + cx, cy = cam.principal_point.x, cam.principal_point.y + x_cam = (u - cx) / fx * z + y_cam = (v - cy) / fy * z + p_cam_h = np.array([x_cam, y_cam, z, 1.0], dtype=np.float64) + T_vision_cam = get_a_tform_b( + rgbd.transforms_snapshot, VISION_FRAME_NAME, rgbd.frame_name_image_sensor + ).to_matrix() + T_body_vision = get_a_tform_b( + rgbd.transforms_snapshot, BODY_FRAME_NAME, VISION_FRAME_NAME + ).to_matrix() + return (T_body_vision @ (T_vision_cam @ p_cam_h))[:3] + +def _compute_wipe_params_from_bbox( + rgbd, + bbox: list[int], + clearance: float = 0.015, + spacing_m: float = 0.05, + max_stroke_len: float = 0.35, +): + """ + From bbox [ymin,xmin,ymax,xmax] in pixels, compute: + - wipe_start_pose (at bottom-right corner + clearance) + - stroke_dx, stroke_dy (upwards along bbox height) + - delta_x_y_between_strokes (across bbox width) + - num_strokes (coverage based on spacing) + - end_look_pose (generic) + """ + ymin, xmin, ymax, xmax = bbox + p_br = (int(xmax), int(ymax)) + p_tr = (int(xmax), int(ymin)) + p_bl = (int(xmin), int(ymax)) + + P_br = _pixel_to_body_xyz(*p_br, rgbd) + P_tr = _pixel_to_body_xyz(*p_tr, rgbd) + P_bl = _pixel_to_body_xyz(*p_bl, rgbd) + + # Start pose + wipe_start_pose = math_helpers.SE3Pose( + x=float(P_br[0]), + y=float(P_br[1]), + z=float(P_br[2] + clearance), + rot=math_helpers.Quat.from_pitch(np.pi / 2), + ) + + # Stroke direction (up) + up_vec = P_tr - P_br + up_vec[2] = 0.0 + up_len = float(np.linalg.norm(up_vec[:2])) + if up_len < 1e-6: + up_len = 0.0 + up_dir = np.array([0.0, 0.0]) + else: + up_dir = up_vec[:2] / up_len + stroke_len = min(up_len, max_stroke_len) + stroke_dx = float(up_dir[0] * stroke_len) + stroke_dy = float(up_dir[1] * stroke_len) + + # Spacing across width (right -> left) + side_vec = P_bl - P_br + side_vec[2] = 0.0 + width_m = float(np.linalg.norm(side_vec[:2])) + if width_m > 1e-6: + side_dir = side_vec[:2] / width_m + else: + side_dir = np.array([0.0, 0.0]) + delta_x_y_between_strokes = (float(side_dir[0] * spacing_m), float(side_dir[1] * spacing_m)) + num_strokes = max(1, int(width_m / max(spacing_m, 1e-3))) + + end_look_pose = math_helpers.SE3Pose( + x=0.65, y=0.0, z=0.4, rot=math_helpers.Quat.from_pitch(np.pi / 2.5) + ) + + return ( + wipe_start_pose, + stroke_dx, + stroke_dy, + delta_x_y_between_strokes, + num_strokes, + end_look_pose, + ) + +def visualize_bbox_prediction(image_path, bbox): + """ + Draws a SCALED bounding box on an image using OpenCV and displays it. + + Args: + image_path (str): Path to the ORIGINAL image file. + bbox (list): Bounding box [ymin, xmin, ymax, xmax] from the model, + relative to the model's input size. + """ + try: + # --- Dimensions of the image the model processed --- + # (This is the key piece of information you were missing) + model_height = 682 + model_width = 910 + + # 1. Read the ORIGINAL image to get its true dimensions + img = cv2.imread(image_path) + if img is None: + print(f"Error: Could not read image from {image_path}") + return + + original_height, original_width, _ = img.shape + + # 2. Calculate scaling factors + y_scale = original_height / model_height + x_scale = original_width / model_width + + # 3. Unpack and scale the model's bounding box coordinates + ymin, xmin, ymax, xmax = bbox + + scaled_ymin = int(ymin * y_scale) + scaled_xmin = int(xmin * x_scale) + scaled_ymax = int(ymax * y_scale) + scaled_xmax = int(xmax * x_scale) + + # 4. Define points for the rectangle using SCALED coordinates + pt1 = (scaled_xmin, scaled_ymin) + pt2 = (scaled_xmax, scaled_ymax) + + # Define color (OpenCV uses BGR format, not RGB) + color_bgr = (0, 0, 255) # Red + thickness = 3 + + # 5. Draw the rectangle on the original image + cv2.rectangle(img, pt1, pt2, color_bgr, thickness) + + # 6. Save the annotated image + output_path = image_path.replace(".png", "_annotated.png").replace(".jpg", "_annotated.jpg") + cv2.imwrite(output_path, img) + + print(f"Successfully saved annotated image to: {output_path}") + return output_path + + except Exception as e: + print(f"An error occurred: {e}") + + +def get_bbox_from_gemini( + vlm_query_str: str, pil_image: Image.Image +) -> list[int]: + """ + Query Gemini VLM to get the bbox coordinates corresponding to the query. + + Args: + vlm_query_str: Prompt asking Gemini to identify the spill + pil_image: PIL Image to analyze + + Returns: + List of [ymin, xmin, ymax, xmax] in pixel coordinates + """ + # Ensure API key is set for Gemini + os.environ["GOOGLE_API_KEY"] = "AIzaSyDCgHTHeGFUe1DCeryLImqqjgPiAyUzw3k" + # vlm = GoogleGeminiVLM("gemini-2.5-flash-preview-05-20") + print(f'inside the function to get the bbox from gemini') + # vlm = GoogleGeminiVLM("gemini-2.5-flash") + # vlm = GoogleGeminiVLM("gemini-2.0-flash") + vlm = GoogleGeminiVLM("gemini-2.5-pro") + def _parse_bbox_list(raw: str) -> list[float]: + """Parse a bbox dict {"bbox": [ymin, xmin, ymax, xmax]} from model output. + Supports optional ```json fenced blocks. Returns raw numeric values + (assumed normalized 0-1000) without scaling. + """ + s = raw.strip() + if "```" in s: + parts = s.split("```") + if len(parts) >= 2: + block = parts[1] + if block.startswith("json\n"): + block = "\n".join(block.splitlines()[1:]) + s = block.strip() + # Load JSON object + try: + obj = json.loads(s) + except Exception: + l, r = s.find("{"), s.rfind("}") + if l == -1 or r == -1 or r <= l: + raise ValueError("Could not find JSON object in model response.") + obj = json.loads(s[l:r + 1]) + + if not isinstance(obj, dict) or "bbox" not in obj: + raise ValueError("Expected a JSON object with key 'bbox'.") + bbox = obj["bbox"] + if not (isinstance(bbox, list) and len(bbox) == 4): + raise ValueError("'bbox' must be a list of 4 numbers [ymin, xmin, ymax, xmax].") + return [float(v) for v in bbox] + + # Query the VLM + print(f'vlm: {vlm}, the query string is: {vlm_query_str}') + vlm_output_list = vlm.sample_completions( + prompt=vlm_query_str, + imgs=[pil_image], + temperature=0.0, + seed=42, + num_completions=1, + ) + vlm_output_str = vlm_output_list[0] + print(f'vlm_output_str: {vlm_output_str}') + + # Parse bbox and convert from normalized [0-1000] to pixel coordinates + ymin_n, xmin_n, ymax_n, xmax_n = _parse_bbox_list(vlm_output_str) + img_height = pil_image.height + img_width = pil_image.width + ymin = int(round(ymin_n * img_height / 1000.0)) + xmin = int(round(xmin_n * img_width / 1000.0)) + ymax = int(round(ymax_n * img_height / 1000.0)) + xmax = int(round(xmax_n * img_width / 1000.0)) + + # Clamp to image bounds + ymin = max(0, min(ymin, img_height - 1)) + xmin = max(0, min(xmin, img_width - 1)) + ymax = max(0, min(ymax, img_height - 1)) + xmax = max(0, min(xmax, img_width - 1)) + + bbox = [ymin, xmin, ymax, xmax] + return bbox + + +def get_points_from_pixels(rgb_image_path, depth_image_path, intrinsics): + rgb = cv2.imread(rgb_image_path, cv2.IMREAD_COLOR) + depth = cv2.imread(depth_image_path, cv2.IMREAD_UNCHANGED) + + if rgb is None: + raise FileNotFoundError(f"Could not read RGB image at: {rgb_image_path}") + if depth is None: + raise FileNotFoundError(f"Could not read depth image at: {depth_image_path}") + + # Ensure single-channel depth + if depth.ndim == 3: + depth = cv2.cvtColor(depth, cv2.COLOR_BGR2GRAY) + + # Convert depth to meters if given as uint16 millimeters + if depth.dtype == np.uint16: + depth_m = depth.astype(np.float32) / 1000.0 + else: + depth_m = depth.astype(np.float32) + + h, w = depth_m.shape + if rgb.shape[:2] != (h, w): + rgb = cv2.resize(rgb, (w, h), interpolation=cv2.INTER_NEAREST) + + fx, fy, cx, cy = intrinsics[0], intrinsics[1], intrinsics[2], intrinsics[3] + + # Create pixel grid + u_coords, v_coords = np.meshgrid(np.arange(w, dtype=np.float32), np.arange(h, dtype=np.float32)) + + z = depth_m + valid = (z > 0) & (z <= 1.5) + + x = (u_coords - cx) / fx * z + y = (v_coords - cy) / fy * z + + # Stack and mask + points = np.stack((x, y, z), axis=-1)[valid] + if points.shape[0] == 0: + print("No points passed the depth filter! Check depth image units and max distance.") + + # Colors: convert BGR (cv2) to RGB and normalize to [0,1] + rgb_rgb = cv2.cvtColor(rgb, cv2.COLOR_BGR2RGB) + colors = (rgb_rgb.reshape(-1, 3)[valid.ravel()] / 255.0).astype(np.float32) + print("positions:", points.shape, points.dtype) + print("colors:", colors.shape, colors.dtype) + return points, colors + + +def gaze(robot, direction: str) -> None: + """Move the hand to look in a certain direction.""" + look_pose = direction_to_pose[direction] + move_hand_to_relative_pose(robot, look_pose) + open_gripper(robot) + +def wipe_online(robot: Robot, lease_client: LeaseClient, lease_keepalive: LeaseKeepAlive, localizer: SpotLocalizer, vlm_query_template: str, z_offset: float) -> None: + # stow the arm + stow_arm(robot) + # have the robot look ahead to look at the spill + gaze(robot, "DOWN") + + # capture the image using the hand camera + rgbds = capture_images(robot, localizer, camera_names=["hand_color_image"]) + rgbd = rgbds["hand_color_image"] + cam_model = rgbd.camera_model + + fx = cam_model.intrinsics.focal_length.x + fy = cam_model.intrinsics.focal_length.y + cx = cam_model.intrinsics.principal_point.x + cy = cam_model.intrinsics.principal_point.y + + intrinsics = [fx, fy, cx, cy] + + ## extract the rgb and the depth image + rgb_img = rgbd.rgb + depth_img = rgbd.depth + # depth_m = depth_img.astype(np.float32) + # if depth_img.dtype == np.uint16: + # depth_m = depth_m / 1000.0 + + save_folderpath = "wipe_online_images" + os.makedirs(save_folderpath, exist_ok=True) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + ## save the rgb and the depth image to the disk + rgb_pil = Image.fromarray(rgb_img) + depth_pil = Image.fromarray(depth_img) + rgb_pil.save(os.path.join(save_folderpath, f"rgb_{timestamp}.png")) + depth_pil.save(os.path.join(save_folderpath, f"depth_{timestamp}.png")) + + points, colors = get_points_from_pixels(os.path.join(save_folderpath, f"rgb_{timestamp}.png"), os.path.join(save_folderpath, f"depth_{timestamp}.png"), intrinsics) + # use the rgb image to detect the spill by querying the Gemini endpoint + # spill_pixel = get_pixel_from_gemini(vlm_query_template, rgb_pil) + pcd = o3d.geometry.PointCloud() + pcd.points = o3d.utility.Vector3dVector(points.astype(np.float32)) + pcd.colors = o3d.utility.Vector3dVector(colors.astype(np.float32)) + # o3d.visualization.draw_geometries([pcd]) + voxel_size = 0.005 + rr.log("3D_points", rr.Points3D(positions=points, colors=colors, radii=voxel_size/2)) + rr.log("rgb", rr.Image(rgb_img)) + rr.log("depth", rr.Image(depth_img)) + + ## call the gemini endpoint to get the bbox prediction corresponding to the spill +# vlm_query_template = """ +# Identify the spill/stain on the surface and provide a bounding box around it. +# The answer should follow the json format: {"bbox": [ymin, xmin, ymax, xmax], "label": "spill"}. +# The coordinates are in [ymin, xmin, ymax, xmax] format normalized to 0-1000. +# """ + # vlm_query_template = "I have an image with some text written on it, and I am interested in finding a bounding box for it. Can you give me the coordinates of the bounding box that encloses the written text?" + bbox = get_bbox_from_gemini(vlm_query_template, rgb_pil) + print(f"Bbox: {bbox}") + # output_path = visualize_bbox_prediction(os.path.join(save_folderpath, f"rgb_{timestamp}.png"), bbox) + annotated_image_path = draw_bounding_box(os.path.join(save_folderpath, f"rgb_{timestamp}.png"), bbox) + # Convert BGR (cv2) -> RGB before logging + # rr.log("bounding_box_image", rr.Image(cv2.cvtColor(annotated_img, cv2.COLOR_BGR2RGB))) + annotated_img = Image.open(annotated_image_path) + rr.log("annotated_image", rr.Image(annotated_img)) + ## move the hand to the bottom-right of the bounding box to verify position + move_hand_to_bbox_bottom_right(robot, rgbd, bbox, z_clearance_m=z_offset) ## this should move the hand to the bottom-right position + + ## once the hand has been moved to the bottom-right of the bounding box, + ## we need to start executing the parameterized skill for wiping the spill + + ( + wipe_start_pose, + stroke_dx, + stroke_dy, + delta_x_y_between_strokes, + num_strokes, + end_look_pose, + ) = _compute_wipe_params_from_bbox(rgbd, bbox, clearance=z_offset, spacing_m=0.05, max_stroke_len=0.35) + + # Example: run a single stroke first (uncomment to test) + # wipe_one_stroke(robot, wipe_start_pose, move_dx=stroke_dx, move_dy=stroke_dy, duration=1.0) + + # Run multi-stroke wipe + wipe_multiple_strokes( + robot=robot, + wipe_start_pose=wipe_start_pose, + end_look_pose=end_look_pose, + stroke_dx=stroke_dx, + stroke_dy=stroke_dy, + delta_x_y_between_strokes=delta_x_y_between_strokes, + num_strokes=num_strokes, + duration_per_stroke=1.0, + num_attempts_per_stroke=1, + ) + + ## once the bounding box coordinates have been successfully obtained + ## we need to place the arm in the right position (one of the corners of the bounding box) to clean the spill + ## we have the identified pixels/bb-coordinates in the camera frame (hand-camera) + +def main() -> None: + parser = argparse.ArgumentParser(description="Online wiping controller.") + parser.add_argument( + "--hostname", + type=str, + required=True, + help="Spot hostname/IP (e.g., 192.168.80.3)", + ) + parser.add_argument( + "--map_name", + type=str, + required=True, + help="The name of the map folder to load (sub-folder under graph_nav_maps)", + ) + parser.add_argument( + "--z_offset", + type=float, + default=0.02, + help="Hand Z offset above surface in meters (clearance).", + ) + args = parser.parse_args() + robot, lease_client, lease_keepalive, localizer = init_robot(args.hostname, args.map_name) + rr.init("wipe_online", spawn=True) + + vlm_query_template = """ +I have an image with some text written on it, and I am interested in finding a bounding box for it. Can you give me the coordinates of the bounding box that encloses the written text? +The answer should follow the json format: {"bbox": [ymin, xmin, ymax, xmax], "label": "spill"}. +The coordinates are in [ymin, xmin, ymax, xmax] format normalized to 0-1000.""" + wipe_online(robot, lease_client, lease_keepalive, localizer, vlm_query_template, args.z_offset) + +if __name__ == "__main__": + main() \ No newline at end of file From b974e2c1eda6bd4d131a9a44a560c9d9a821efa2 Mon Sep 17 00:00:00 2001 From: Aditya Agarwal Date: Sun, 2 Nov 2025 17:52:03 -0500 Subject: [PATCH 3/6] updates to the wipe online tool. Added function call to the exec_plan.py file --- exec_plan.py | 13 +- skills/wipe_online.py | 30 +- spot_utils/pretrained_model_interface.py | 449 +++++++++++++++++++++++ 3 files changed, 486 insertions(+), 6 deletions(-) create mode 100644 spot_utils/pretrained_model_interface.py diff --git a/exec_plan.py b/exec_plan.py index d00ae5f..652d6db 100644 --- a/exec_plan.py +++ b/exec_plan.py @@ -24,6 +24,7 @@ open_gripper, ) from skills.wipe import wipe_multiple_strokes +from skills.wipe_online import wipe_online as run_wipe_online from skills.spot_navigation import navigate_to_absolute_pose from spot_utils.perception.spot_cameras import capture_images from spot_utils.spot_localization import SpotLocalizer @@ -163,6 +164,16 @@ def vertical_wipe( ) +def wipe_at(*args, **kwargs) -> None: + """Run the online wipe skill using defaults from the skill module.""" + run_wipe_online( + ROBOT, + None, + None, + LOCALIZER, + ) + + if __name__ == "__main__": # running this script standalone initializes a bosdyn robot and localizer. # It then executes the list of commands provided in the plan file @@ -196,7 +207,7 @@ def vertical_wipe( SPOT_ROOM_POSE = metadata["spot-room-pose"] else: print("spot-room-pose not found in metadata.yaml, using default val") - SPOT_ROOM_POSE = {"x": 0.0, "y": 0.0, "z": 0.0} + SPOT_ROOM_POSE = {"x": 0.0, "y": 0.0, "angle": 0.0} with open(args.plan, "r") as plan_file: exec(plan_file.read()) print("done") diff --git a/skills/wipe_online.py b/skills/wipe_online.py index 1e6afdf..0dd7200 100644 --- a/skills/wipe_online.py +++ b/skills/wipe_online.py @@ -64,6 +64,15 @@ def init_robot(hostname: str, map_name: str) -> tuple[Robot, LeaseClient, LeaseK "AHEAD": DEFAULT_HAND_LOOK_FLOOR_POSE, } +DEFAULT_WIPE_ONLINE_Z_OFFSET = 0.02 +DEFAULT_WIPE_VLM_QUERY_TEMPLATE = ( + "I have an image with some text written on it, and I am interested in finding " + "a bounding box for it. Can you give me the coordinates of the bounding box " + "that encloses the written text? The answer should follow the json format: " + '{"bbox": [ymin, xmin, ymax, xmax], "label": "spill"}. ' + "The coordinates are in [ymin, xmin, ymax, xmax] format normalized to 0-1000." +) + def visualize_bbox_normalized(image_path, bbox_norm, color=(0, 255, 0), thickness=2): """ @@ -427,7 +436,6 @@ def get_bbox_from_gemini( List of [ymin, xmin, ymax, xmax] in pixel coordinates """ # Ensure API key is set for Gemini - os.environ["GOOGLE_API_KEY"] = "AIzaSyDCgHTHeGFUe1DCeryLImqqjgPiAyUzw3k" # vlm = GoogleGeminiVLM("gemini-2.5-flash-preview-05-20") print(f'inside the function to get the bbox from gemini') # vlm = GoogleGeminiVLM("gemini-2.5-flash") @@ -544,13 +552,21 @@ def gaze(robot, direction: str) -> None: """Move the hand to look in a certain direction.""" look_pose = direction_to_pose[direction] move_hand_to_relative_pose(robot, look_pose) - open_gripper(robot) + # open_gripper(robot) -def wipe_online(robot: Robot, lease_client: LeaseClient, lease_keepalive: LeaseKeepAlive, localizer: SpotLocalizer, vlm_query_template: str, z_offset: float) -> None: +def wipe_online( + robot: Robot, + lease_client: LeaseClient, + lease_keepalive: LeaseKeepAlive, + localizer: SpotLocalizer, + vlm_query_template: Optional[str] = None, + z_offset: float = DEFAULT_WIPE_ONLINE_Z_OFFSET, +) -> None: # stow the arm stow_arm(robot) # have the robot look ahead to look at the spill - gaze(robot, "DOWN") + gaze(robot, "AHEAD") + # gaze(robot, "DOWN") # capture the image using the hand camera rgbds = capture_images(robot, localizer, camera_names=["hand_color_image"]) @@ -592,12 +608,16 @@ def wipe_online(robot: Robot, lease_client: LeaseClient, lease_keepalive: LeaseK rr.log("rgb", rr.Image(rgb_img)) rr.log("depth", rr.Image(depth_img)) + # assert False + ## call the gemini endpoint to get the bbox prediction corresponding to the spill # vlm_query_template = """ # Identify the spill/stain on the surface and provide a bounding box around it. # The answer should follow the json format: {"bbox": [ymin, xmin, ymax, xmax], "label": "spill"}. # The coordinates are in [ymin, xmin, ymax, xmax] format normalized to 0-1000. # """ + if vlm_query_template is None: + vlm_query_template = DEFAULT_WIPE_VLM_QUERY_TEMPLATE # vlm_query_template = "I have an image with some text written on it, and I am interested in finding a bounding box for it. Can you give me the coordinates of the bounding box that encloses the written text?" bbox = get_bbox_from_gemini(vlm_query_template, rgb_pil) print(f"Bbox: {bbox}") @@ -659,7 +679,7 @@ def main() -> None: parser.add_argument( "--z_offset", type=float, - default=0.02, + default=0.00, help="Hand Z offset above surface in meters (clearance).", ) args = parser.parse_args() diff --git a/spot_utils/pretrained_model_interface.py b/spot_utils/pretrained_model_interface.py new file mode 100644 index 0000000..f555092 --- /dev/null +++ b/spot_utils/pretrained_model_interface.py @@ -0,0 +1,449 @@ +"""Interface to pretrained large models. + +These might be joint Vision-Language Models (VLM's) or Large Language +Models (LLM's) +""" + +import abc +import base64 +import logging +import os +from io import BytesIO +from typing import Collection, Dict, List, Optional, Union + +import google.generativeai as genai +import imagehash +import openai +import PIL.Image +from tenacity import retry, stop_after_attempt, wait_random_exponential + +# This is a special string that we assume will never appear in a prompt, and +# which we use to separate prompt and completion in the cache. The reason to +# do it this way, rather than saving the prompt and responses separately, +# is that we want it to be easy to browse the cache as text files. +_CACHE_SEP = "\n####$$$###$$$####$$$$###$$$####$$$###$$$###\n" + +pretrained_model_prompt_cache_dir = "cache-fixme" + + +class PretrainedLargeModel(abc.ABC): + """A pretrained large vision or language model.""" + + @abc.abstractmethod + def get_id(self) -> str: + """Get a string identifier for this model. + + This identifier should include sufficient information so that + querying the same model with the same prompt and same identifier + should yield the same result (assuming temperature 0). + """ + raise NotImplementedError("Override me!") + + @abc.abstractmethod + def _sample_completions(self, + prompt: str, + imgs: Optional[List[PIL.Image.Image]], + temperature: float, + seed: int, + stop_token: Optional[str] = None, + num_completions: int = 1) -> List[str]: + """This is the main method that subclasses must implement. + + This helper method is called by sample_completions(), which + caches the prompts and responses to disk. + """ + raise NotImplementedError("Override me!") + + def sample_completions(self, + prompt: str, + imgs: Optional[List[PIL.Image.Image]], + temperature: float, + seed: int, + stop_token: Optional[str] = None, + num_completions: int = 1) -> List[str]: + """Sample one or more completions from a prompt. + + Higher temperatures will increase the variance in the responses. + The seed may not be used and the results may therefore not be + reproducible for models where we only have access through an API + that does not expose the ability to set a random seed. Responses + are saved to disk. + """ + # Set up the cache file. + assert _CACHE_SEP not in prompt + os.makedirs(pretrained_model_prompt_cache_dir, exist_ok=True) + model_id = self.get_id() + prompt_id = hash(prompt) + config_id = f"{temperature}_{seed}_{num_completions}_" + \ + f"{stop_token}" + # If the temperature is 0, the seed does not matter. + if temperature == 0.0: + config_id = f"most_likely_{num_completions}_{stop_token}" + cache_foldername = f"{model_id}_{config_id}_{prompt_id}" + if imgs is not None: + # We also need to hash all the images in the prompt. + img_hash_list: List[str] = [] + for img in imgs: + img_hash_list.append(str(imagehash.phash(img))) + # NOTE: it's possible that this string (the concatenated hashes of + # each image) is very long. This would make the final cache + # foldername long. In many operating systems, the maximum folder + # name length is 255 characters. To shorten this foldername more, we + # can hash this string into a shorter string. For example, look at + # https://stackoverflow.com/questions/57263436/hash-like-string-shortener-with-decoder # pylint:disable=line-too-long + imgs_id = hash("".join(img_hash_list)) + cache_foldername += f"{imgs_id}" + cache_folderpath = os.path.join(pretrained_model_prompt_cache_dir, + cache_foldername) + os.makedirs(cache_folderpath, exist_ok=True) + cache_filename = "prompt.txt" + cache_filepath = os.path.join(pretrained_model_prompt_cache_dir, + cache_foldername, cache_filename) + if not os.path.exists(cache_filepath): + llm_use_cache_only = False + if llm_use_cache_only: + raise ValueError("No cached response found for prompt.") + logging.debug(f"Querying model {model_id} with new prompt.") + # Query the model. + completions = self._sample_completions(prompt, imgs, temperature, + seed, stop_token, + num_completions) + # Cache the completion. + cache_str = prompt + _CACHE_SEP + _CACHE_SEP.join(completions) + with open(cache_filepath, 'w', encoding='utf-8') as f: + f.write(cache_str) + if imgs is not None: + # Also save the images for easy debugging. + imgs_folderpath = os.path.join(cache_folderpath, "imgs") + os.makedirs(imgs_folderpath, exist_ok=True) + for i, img in enumerate(imgs): + filename_suffix = str(i) + ".jpg" + img.save(os.path.join(imgs_folderpath, filename_suffix)) + logging.debug(f"Saved model response to {cache_filepath}.") + # Load the saved completion. + with open(cache_filepath, 'r', encoding='utf-8') as f: + cache_str = f.read() + logging.debug(f"Loaded model response from {cache_filepath}.") + assert cache_str.count(_CACHE_SEP) == num_completions + cached_prompt, completion_strs = cache_str.split(_CACHE_SEP, 1) + assert cached_prompt == prompt + completions = completion_strs.split(_CACHE_SEP) + return completions + + +class VisionLanguageModel(PretrainedLargeModel): + """A class for all VLM's.""" + + def sample_completions( + self, + prompt: str, + imgs: Optional[List[PIL.Image.Image]], + temperature: float, + seed: int, + stop_token: Optional[str] = None, + num_completions: int = 1) -> List[str]: # pragma: no cover + assert imgs is not None + return super().sample_completions(prompt, imgs, temperature, seed, + stop_token, num_completions) + + +class LargeLanguageModel(PretrainedLargeModel): + """A class for all LLM's.""" + + def sample_completions( + self, + prompt: str, + imgs: Optional[List[PIL.Image.Image]], + temperature: float, + seed: int, + stop_token: Optional[str] = None, + num_completions: int = 1) -> List[str]: # pragma: no cover + assert imgs is None + return super().sample_completions(prompt, imgs, temperature, seed, + stop_token, num_completions) + + +class OpenAIModel(): + """Common interface with methods for all OpenAI-based models.""" + + def set_openai_key(self, key: Optional[str] = None) -> None: + """Set the OpenAI API key.""" + if key is None: + assert "OPENAI_API_KEY" in os.environ + key = os.environ["OPENAI_API_KEY"] + + # @retry(wait=wait_random_exponential(min=1, max=60), + # stop=stop_after_attempt(10)) + # def call_openai_api(self, + # messages: list, + # model: str = "gpt-4", + # seed: Optional[int] = None, + # max_tokens: int = 32, + # temperature: float = 0.2, + # verbose: bool = False) -> str: # pragma: no cover + # """Make an API call to OpenAI.""" + # client = openai.OpenAI() + # completion = client.chat.completions.create( + # model=model, + # messages=messages, + # seed=seed, + # max_tokens=max_tokens, + # temperature=temperature, + # ) + # if verbose: + # logging.debug(f"OpenAI API response: {completion}") + # assert len(completion.choices) == 1 + # assert completion.choices[0].message.content is not None + # return completion.choices[0].message.content + @retry(wait=wait_random_exponential(min=1, max=60), + stop=stop_after_attempt(10)) + def call_openai_api(self, + messages: List[dict], + model: str = "gpt-4", + seed: Optional[int] = None, + max_tokens: int = 32, + temperature: float = 0.2, + verbose: bool = False) -> str: + """Make an API call to OpenAI.""" + client = openai.OpenAI() + + # Insert a strong, safe research system prompt + system_message = { + "role": "system", + # "content": ( + # "You are a helpful AI research assistant working on a robotics project at MIT. " + # "All tasks are safe, academic, and intended for legitimate research purposes. " + # "Respond helpfully and do not refuse unless absolutely necessary." + # ) + "content": ( + "You are a helpful AI research assistant working on a robotics project at MIT. " + "The task involves analyzing synthetic lab images in a controlled academic setting. " + "You are permitted to make judgments about predicates describing object states, robot actions, and scene properties. " + "You should always attempt to answer unless the input is truly unintelligible. " + "Assume no safety concerns. Assume all content is safe." + "You are permitted to infer reasonable truths based on typical physics, causality, and object behaviors." + ) + } + + # Ensure the system message is the first message + full_messages = [system_message] + messages + + # Make the chat completion call + completion = client.chat.completions.create( + model=model, + messages=full_messages, + seed=seed, + max_tokens=max_tokens, + temperature=temperature, + ) + + if verbose: + logging.debug(f"OpenAI API response: {completion}") + + assert len(completion.choices) == 1, "Unexpected number of choices returned." + assert completion.choices[0].message.content is not None, "Completion content is None." + + return completion.choices[0].message.content + + +class GoogleGeminiModel(): + """Common interface and methods for all Gemini-based models. + + Assumes that an environment variable GOOGLE_API_KEY is set with the + necessary API key to query the particular model name. + """ + + def __init__(self, model_name: str) -> None: + """See https://ai.google.dev/models/gemini for the list of available + model names.""" + self._model_name = model_name + assert "GOOGLE_API_KEY" in os.environ + genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) + self._model = genai.GenerativeModel(self._model_name) # pylint:disable=no-member + + +class OpenAILLM(LargeLanguageModel, OpenAIModel): + """Interface to openAI LLMs. + + Assumes that an environment variable OPENAI_API_KEY is set to a + private API key for beta.openai.com. + """ + + def __init__(self, model_name: str) -> None: + """See https://platform.openai.com/docs/models for the list of + available model names.""" + self._model_name = model_name + # Note that max_tokens is the maximum response length (not prompt). + # From OpenAI docs: "The token count of your prompt plus max_tokens + # cannot exceed the model's context length." + self._max_tokens = 700 + self.set_openai_key() + + def get_id(self) -> str: + return f"openai-{self._model_name}" + + def _sample_completions( + self, + prompt: str, + imgs: Optional[List[PIL.Image.Image]], + temperature: float, + seed: int, + stop_token: Optional[str] = None, + num_completions: int = 1) -> List[str]: # pragma: no cover + del imgs, seed, stop_token # unused + messages = [{"role": "user", "content": prompt, "type": "text"}] + responses = [ + self.call_openai_api(messages, + model=self._model_name, + temperature=temperature, + max_tokens=self._max_tokens) + for _ in range(num_completions) + ] + return responses + + +class GoogleGeminiLLM(LargeLanguageModel, GoogleGeminiModel): + """Interface to the Google Gemini VLM (1.5). + + Assumes that an environment variable GOOGLE_API_KEY is set with the + necessary API key to query the particular model name. + """ + + @retry(wait=wait_random_exponential(min=1, max=60), + stop=stop_after_attempt(10)) + def _sample_completions( + self, + prompt: str, + imgs: Optional[List[PIL.Image.Image]], + temperature: float, + seed: int, + stop_token: Optional[str] = None, + num_completions: int = 1) -> List[str]: # pragma: no cover + del seed, stop_token # unused + assert imgs is None + generation_config = genai.types.GenerationConfig( # pylint:disable=no-member + candidate_count=num_completions, + temperature=temperature) + response = self._model.generate_content( + [prompt], generation_config=generation_config) # type: ignore + response.resolve() # type: ignore + return [response.text] + + def get_id(self) -> str: + return f"Google-{self._model_name}" + + +class GoogleGeminiVLM(VisionLanguageModel, GoogleGeminiModel): + """Interface to the Google Gemini VLM (1.5). + + Assumes that an environment variable GOOGLE_API_KEY is set with the + necessary API key to query the particular model name. + """ + + @retry(wait=wait_random_exponential(min=1, max=60), + stop=stop_after_attempt(20)) + def _sample_completions( + self, + prompt: str, + imgs: Optional[List[PIL.Image.Image]], + temperature: float, + seed: int, + stop_token: Optional[str] = None, + num_completions: int = 1) -> List[str]: # pragma: no cover + del seed, stop_token # unused + assert imgs is not None + generation_config = genai.types.GenerationConfig( # pylint:disable=no-member + candidate_count=num_completions, + temperature=temperature) + response = self._model.generate_content( + [prompt] + imgs, # type: ignore + generation_config=generation_config) # type: ignore + # import pdb; pdb.set_trace() + response.resolve() # type: ignore + return [response.text] + + def get_id(self) -> str: + return f"Google-{self._model_name}" + + +class OpenAIVLM(VisionLanguageModel, OpenAIModel): + """Interface for OpenAI's VLMs, including GPT-4 Turbo (and preview + versions).""" + + def __init__(self, model_name: str): + """Initialize with a specific model name.""" + self.model_name = model_name + # Note that max_tokens is the maximum response length (not prompt). + # From OpenAI docs: "The token count of your prompt plus max_tokens + # cannot exceed the model's context length." + self._max_tokens = 700 + self.set_openai_key() + + def prepare_vision_messages( + self, + images: List[PIL.Image.Image], + prefix: Optional[str] = None, + suffix: Optional[str] = None, + image_size: Optional[int] = 512, + detail: str = "auto" + ) -> List[Dict[str, Union[str, List[Dict[str, str]], List[Dict[ + str, Collection[str]]]]]]: + """Prepare text and image messages for the OpenAI API.""" + content: List[Dict[str, Union[str, Collection[str]]]] = [] + if prefix: + content.append({"text": prefix, "type": "text"}) + assert images + assert detail in ["auto", "low", "high"] + for img in images: + img_resized = img + if image_size: + factor = image_size / max(img.size) + img_resized = img.resize( + (int(img.size[0] * factor), int(img.size[1] * factor))) + # Convert the image to PNG format and encode it in base64 + buffer = BytesIO() + img_resized.save(buffer, format="PNG") + buf = buffer.getvalue() + frame = base64.b64encode(buf).decode("utf-8") + content_str = { + "image_url": { + "url": f"data:image/png;base64,{frame}", + "detail": detail + }, + "type": "image_url" + } + content.append(content_str) + if suffix: + content.append({"text": suffix, "type": "text"}) + return [{"role": "user", "content": content}] + + def get_id(self) -> str: + """Get an identifier for the model.""" + return f"OpenAI-{self.model_name}" + + def _sample_completions( + self, + prompt: str, + imgs: Optional[List[PIL.Image.Image]], + temperature: float, + seed: int, + stop_token: Optional[str] = None, + num_completions: int = 1, + ) -> List[str]: # pragma: no cover + """Query the model and get responses.""" + del seed, stop_token # unused. + if imgs is None: + raise ValueError("images cannot be None") + messages = self.prepare_vision_messages(prefix=prompt, + images=imgs, + detail="auto") + responses = [ + self.call_openai_api(messages, model=self.model_name, max_tokens=self._max_tokens, temperature=temperature) + for _ in range(num_completions) + ] + # while any("sorry" in response.lower() for response in responses): + # responses = [ + # self.call_openai_api(messages, model=self.model_name, max_tokens=self._max_tokens, temperature=temperature) + # for _ in range(num_completions) + # ] + return responses From 725d2aa7b97861c21835b5384708de24bce271fd Mon Sep 17 00:00:00 2001 From: Aditya Agarwal Date: Mon, 3 Nov 2025 15:43:14 -0500 Subject: [PATCH 4/6] making wipe skill more robust + logging things in rerun --- skills/wipe_online.py | 123 +++++++++++++++++++++++++----------------- 1 file changed, 74 insertions(+), 49 deletions(-) diff --git a/skills/wipe_online.py b/skills/wipe_online.py index 0dd7200..e336e79 100644 --- a/skills/wipe_online.py +++ b/skills/wipe_online.py @@ -31,6 +31,8 @@ from PIL import Image import open3d as o3d +rr.init("wipe_online", spawn=True) + def init_robot(hostname: str, map_name: str) -> tuple[Robot, LeaseClient, LeaseKeepAlive, SpotLocalizer]: sdk = create_standard_sdk("WipeOnlineClient") robot = sdk.create_robot(hostname) @@ -143,14 +145,13 @@ def draw_bounding_box(image_path, bbox_pixels, color=(0, 255, 0), thickness=2): # return img_out return output_path -def move_hand_to_bbox_bottom_right( - robot: Robot, +def compute_target_pose_from_bbox( rgbd, bbox_pixels: list[int], - z_clearance_m: float = 0.01, -): + z_clearance_m: float = 0.02, +) -> math_helpers.SE3Pose: """ - Move the hand to the 3D point corresponding to the bbox bottom-right pixel. + Compute the target pose from the bbox pixels. bbox_pixels: [ymin, xmin, ymax, xmax] in pixel units. """ ymin, xmin, ymax, xmax = bbox_pixels @@ -195,7 +196,8 @@ def move_hand_to_bbox_bottom_right( rot=math_helpers.Quat.from_pitch(np.pi / 2), ) - move_hand_to_relative_pose(robot, target_pose) + # move_hand_to_relative_pose(robot, target_pose) + return target_pose def wipe_one_stroke( robot: Robot, @@ -350,7 +352,7 @@ def _compute_wipe_params_from_bbox( else: side_dir = np.array([0.0, 0.0]) delta_x_y_between_strokes = (float(side_dir[0] * spacing_m), float(side_dir[1] * spacing_m)) - num_strokes = max(1, int(width_m / max(spacing_m, 1e-3))) + num_strokes = max(1, int(width_m / max(spacing_m, 1e-3))+1) end_look_pose = math_helpers.SE3Pose( x=0.65, y=0.0, z=0.4, rot=math_helpers.Quat.from_pitch(np.pi / 2.5) @@ -552,7 +554,7 @@ def gaze(robot, direction: str) -> None: """Move the hand to look in a certain direction.""" look_pose = direction_to_pose[direction] move_hand_to_relative_pose(robot, look_pose) - # open_gripper(robot) + open_gripper(robot) def wipe_online( robot: Robot, @@ -590,57 +592,84 @@ def wipe_online( save_folderpath = "wipe_online_images" os.makedirs(save_folderpath, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + ## save the rgb and the depth image to the disk rgb_pil = Image.fromarray(rgb_img) depth_pil = Image.fromarray(depth_img) - rgb_pil.save(os.path.join(save_folderpath, f"rgb_{timestamp}.png")) - depth_pil.save(os.path.join(save_folderpath, f"depth_{timestamp}.png")) + rgb_image_path = os.path.join(save_folderpath, f"rgb_{timestamp}.png") + depth_image_path = os.path.join(save_folderpath, f"depth_{timestamp}.png") + rgb_pil.save(rgb_image_path) + depth_pil.save(depth_image_path) - points, colors = get_points_from_pixels(os.path.join(save_folderpath, f"rgb_{timestamp}.png"), os.path.join(save_folderpath, f"depth_{timestamp}.png"), intrinsics) - # use the rgb image to detect the spill by querying the Gemini endpoint - # spill_pixel = get_pixel_from_gemini(vlm_query_template, rgb_pil) + points, colors = get_points_from_pixels(rgb_image_path, depth_image_path, intrinsics) pcd = o3d.geometry.PointCloud() pcd.points = o3d.utility.Vector3dVector(points.astype(np.float32)) pcd.colors = o3d.utility.Vector3dVector(colors.astype(np.float32)) # o3d.visualization.draw_geometries([pcd]) + voxel_size = 0.005 - rr.log("3D_points", rr.Points3D(positions=points, colors=colors, radii=voxel_size/2)) - rr.log("rgb", rr.Image(rgb_img)) - rr.log("depth", rr.Image(depth_img)) - - # assert False - - ## call the gemini endpoint to get the bbox prediction corresponding to the spill -# vlm_query_template = """ -# Identify the spill/stain on the surface and provide a bounding box around it. -# The answer should follow the json format: {"bbox": [ymin, xmin, ymax, xmax], "label": "spill"}. -# The coordinates are in [ymin, xmin, ymax, xmax] format normalized to 0-1000. -# """ + rgb = cv2.cvtColor(cv2.imread(rgb_image_path), cv2.COLOR_BGR2RGB) + rr.log('camera/rgb', rr.Image(rgb)) + + depth = cv2.imread(depth_image_path, cv2.IMREAD_UNCHANGED).astype(np.float32) / 1000.0 + print(f'printing the max and min values of the depth image : {depth.max()} and {depth.min()}') + rr.log('camera/depth', rr.Image(depth)) + + # Transform points from camera frame to BODY frame for consistent visualization with target_pose + T_vision_cam = get_a_tform_b( + rgbd.transforms_snapshot, VISION_FRAME_NAME, rgbd.frame_name_image_sensor + ).to_matrix() + T_body_vision = get_a_tform_b( + rgbd.transforms_snapshot, BODY_FRAME_NAME, VISION_FRAME_NAME + ).to_matrix() + T_body_cam = T_body_vision @ T_vision_cam + + points_cam = points.astype(np.float32) + num_pts = points_cam.shape[0] + if num_pts > 0: + points_cam_h = np.concatenate([points_cam, np.ones((num_pts, 1), dtype=np.float32)], axis=1) + points_body_h = (T_body_cam @ points_cam_h.T).T + points_body = points_body_h[:, :3].astype(np.float32) + else: + points_body = points_cam + + # Log the 3D points in BODY frame + rr.log('scene/points3d_body', rr.Points3D(positions=points_body, colors=colors, radii=voxel_size/2)) + if vlm_query_template is None: vlm_query_template = DEFAULT_WIPE_VLM_QUERY_TEMPLATE # vlm_query_template = "I have an image with some text written on it, and I am interested in finding a bounding box for it. Can you give me the coordinates of the bounding box that encloses the written text?" bbox = get_bbox_from_gemini(vlm_query_template, rgb_pil) - print(f"Bbox: {bbox}") - # output_path = visualize_bbox_prediction(os.path.join(save_folderpath, f"rgb_{timestamp}.png"), bbox) - annotated_image_path = draw_bounding_box(os.path.join(save_folderpath, f"rgb_{timestamp}.png"), bbox) - # Convert BGR (cv2) -> RGB before logging - # rr.log("bounding_box_image", rr.Image(cv2.cvtColor(annotated_img, cv2.COLOR_BGR2RGB))) - annotated_img = Image.open(annotated_image_path) - rr.log("annotated_image", rr.Image(annotated_img)) - ## move the hand to the bottom-right of the bounding box to verify position - move_hand_to_bbox_bottom_right(robot, rgbd, bbox, z_clearance_m=z_offset) ## this should move the hand to the bottom-right position + print(f"The coordinates of the bounding box are: {bbox}") - ## once the hand has been moved to the bottom-right of the bounding box, - ## we need to start executing the parameterized skill for wiping the spill + ## log the annotated image with the bounding box + annotated_image_path = draw_bounding_box(os.path.join(save_folderpath, f"rgb_{timestamp}.png"), bbox) + annotated_img = cv2.cvtColor(cv2.imread(annotated_image_path), cv2.COLOR_BGR2RGB) + rr.log('results/annotated', rr.Image(annotated_img)) + + ## move the hand to the bottom-right position of the bounding box + target_pose = compute_target_pose_from_bbox(rgbd, bbox, z_clearance_m=z_offset) + # Log a red sphere at the target pose position + rr.log( + 'results/target_pose_marker', + rr.Points3D( + positions=np.array([[target_pose.x, target_pose.y, target_pose.z]], dtype=np.float32), + colors=np.array([[255, 0, 0]], dtype=np.uint8), + radii=0.03, + ), + ) + + move_hand_to_relative_pose(robot, target_pose) - ( - wipe_start_pose, - stroke_dx, - stroke_dy, - delta_x_y_between_strokes, - num_strokes, - end_look_pose, - ) = _compute_wipe_params_from_bbox(rgbd, bbox, clearance=z_offset, spacing_m=0.05, max_stroke_len=0.35) + ## compute the wipe parameters from the bounding box coordinates + wipe_start_pose, stroke_dx, stroke_dy, delta_x_y_between_strokes, num_strokes, end_look_pose = _compute_wipe_params_from_bbox(rgbd, bbox, clearance=z_offset, spacing_m=0.05, max_stroke_len=0.35) + # ## log the wipe parameters in rerun + # rr.log('results/wipe_start_pose', rr.Pose3D(position=wipe_start_pose.position, rotation=wipe_start_pose.rotation)) + # rr.log('results/stroke_dx', stroke_dx) + # rr.log('results/stroke_dy', stroke_dy) + # rr.log('results/delta_x_y_between_strokes', delta_x_y_between_strokes) + # rr.log('results/num_strokes', num_strokes) + # rr.log('results/end_look_pose', rr.Pose3D(position=end_look_pose.position, rotation=end_look_pose.rotation)) # Example: run a single stroke first (uncomment to test) # wipe_one_stroke(robot, wipe_start_pose, move_dx=stroke_dx, move_dy=stroke_dy, duration=1.0) @@ -658,10 +687,6 @@ def wipe_online( num_attempts_per_stroke=1, ) - ## once the bounding box coordinates have been successfully obtained - ## we need to place the arm in the right position (one of the corners of the bounding box) to clean the spill - ## we have the identified pixels/bb-coordinates in the camera frame (hand-camera) - def main() -> None: parser = argparse.ArgumentParser(description="Online wiping controller.") parser.add_argument( @@ -684,7 +709,7 @@ def main() -> None: ) args = parser.parse_args() robot, lease_client, lease_keepalive, localizer = init_robot(args.hostname, args.map_name) - rr.init("wipe_online", spawn=True) + # rr.init("wipe_online", spawn=True) vlm_query_template = """ I have an image with some text written on it, and I am interested in finding a bounding box for it. Can you give me the coordinates of the bounding box that encloses the written text? From a789054979619a0bdd3f9cb72ce9898f0816d928 Mon Sep 17 00:00:00 2001 From: Aditya Agarwal Date: Mon, 3 Nov 2025 19:06:15 -0500 Subject: [PATCH 5/6] added visualization of the corners of the wipe surface, wiping surface itself, and the wipe waypoints --- skills/wipe_online.py | 63 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 54 insertions(+), 9 deletions(-) diff --git a/skills/wipe_online.py b/skills/wipe_online.py index e336e79..3cae03e 100644 --- a/skills/wipe_online.py +++ b/skills/wipe_online.py @@ -663,17 +663,62 @@ def wipe_online( ## compute the wipe parameters from the bounding box coordinates wipe_start_pose, stroke_dx, stroke_dy, delta_x_y_between_strokes, num_strokes, end_look_pose = _compute_wipe_params_from_bbox(rgbd, bbox, clearance=z_offset, spacing_m=0.05, max_stroke_len=0.35) - # ## log the wipe parameters in rerun - # rr.log('results/wipe_start_pose', rr.Pose3D(position=wipe_start_pose.position, rotation=wipe_start_pose.rotation)) - # rr.log('results/stroke_dx', stroke_dx) - # rr.log('results/stroke_dy', stroke_dy) - # rr.log('results/delta_x_y_between_strokes', delta_x_y_between_strokes) - # rr.log('results/num_strokes', num_strokes) - # rr.log('results/end_look_pose', rr.Pose3D(position=end_look_pose.position, rotation=end_look_pose.rotation)) - # Example: run a single stroke first (uncomment to test) - # wipe_one_stroke(robot, wipe_start_pose, move_dx=stroke_dx, move_dy=stroke_dy, duration=1.0) + # Visualize the wipe surface in BODY frame: corners, mesh, and stroke paths + def _as_np_pose(p): + return np.array([p.x, p.y, p.z], dtype=np.float32) + start = _as_np_pose(wipe_start_pose) + stroke_vec = np.array([stroke_dx, stroke_dy, 0.0], dtype=np.float32) + delta_vec = np.array([delta_x_y_between_strokes[0], delta_x_y_between_strokes[1], 0.0], dtype=np.float32) + + # Corners A (start), B (start + stroke), D (last row start), C (last row end) + A = start + B = start + stroke_vec + D = start + max(int(num_strokes) - 1, 0) * delta_vec + C = D + stroke_vec + + corners_body = np.stack([A, B, C, D], axis=0).astype(np.float32) + + # i) visualize the 3D corners + rr.log( + 'scene/wipe_surface/corners', + rr.Points3D( + positions=corners_body, + colors=np.array([[0, 128, 255]] * 4, dtype=np.uint8), + radii=0.01, + ), + ) + + # ii) visualize the wipe surface polygon (two triangles) + rr.log( + 'scene/wipe_surface/mesh', + rr.Mesh3D( + vertex_positions=corners_body, + triangle_indices=np.array([[0, 1, 2], [0, 2, 3]], dtype=np.uint32), + vertex_colors=np.array([[0, 255, 0, 80]] * 4, dtype=np.uint8), + ), + ) + + # iii) Visualize the wipe strokes (paths) + strokes = [] + curr = start.copy() + for _ in range(int(max(num_strokes, 0))): + s = curr + e = curr + stroke_vec + strokes.append(np.stack([s, e], axis=0)) + curr = curr + delta_vec + + if len(strokes) > 0: + rr.log( + 'scene/wipe_surface/strokes', + rr.LineStrips3D( + strips=strokes, + colors=np.array([[255, 0, 0]], dtype=np.uint8), + radii=0.005, + ), + ) + # Run multi-stroke wipe wipe_multiple_strokes( robot=robot, From 12729e8020fe7c432566270f16d185527568f3a7 Mon Sep 17 00:00:00 2001 From: Aditya Agarwal Date: Mon, 3 Nov 2025 20:26:53 -0500 Subject: [PATCH 6/6] increase percentage bounding box if required --- skills/wipe_online.py | 41 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/skills/wipe_online.py b/skills/wipe_online.py index 3cae03e..0e33788 100644 --- a/skills/wipe_online.py +++ b/skills/wipe_online.py @@ -339,6 +339,7 @@ def _compute_wipe_params_from_bbox( up_dir = np.array([0.0, 0.0]) else: up_dir = up_vec[:2] / up_len + # Stroke length from bbox height, limited by max_stroke_len stroke_len = min(up_len, max_stroke_len) stroke_dx = float(up_dir[0] * stroke_len) stroke_dy = float(up_dir[1] * stroke_len) @@ -563,6 +564,7 @@ def wipe_online( localizer: SpotLocalizer, vlm_query_template: Optional[str] = None, z_offset: float = DEFAULT_WIPE_ONLINE_Z_OFFSET, + expand_percentage: float = 0.0, ) -> None: # stow the arm stow_arm(robot) @@ -642,6 +644,21 @@ def wipe_online( bbox = get_bbox_from_gemini(vlm_query_template, rgb_pil) print(f"The coordinates of the bounding box are: {bbox}") + # Optionally expand bbox in image space by a percentage along all directions + if expand_percentage and expand_percentage > 0.0: + ymin, xmin, ymax, xmax = bbox + H, W = rgb_img.shape[0], rgb_img.shape[1] + height_px = max(1, (ymax - ymin)) + width_px = max(1, (xmax - xmin)) + dy = int(round(0.5 * expand_percentage * height_px)) + dx = int(round(0.5 * expand_percentage * width_px)) + ymin_exp = max(0, ymin - dy) + ymax_exp = min(H - 1, ymax + dy) + xmin_exp = max(0, xmin - dx) + xmax_exp = min(W - 1, xmax + dx) + bbox = [ymin_exp, xmin_exp, ymax_exp, xmax_exp] + print(f"Expanded bbox by {expand_percentage*100:.1f}% -> {bbox}") + ## log the annotated image with the bounding box annotated_image_path = draw_bounding_box(os.path.join(save_folderpath, f"rgb_{timestamp}.png"), bbox) annotated_img = cv2.cvtColor(cv2.imread(annotated_image_path), cv2.COLOR_BGR2RGB) @@ -662,7 +679,13 @@ def wipe_online( move_hand_to_relative_pose(robot, target_pose) ## compute the wipe parameters from the bounding box coordinates - wipe_start_pose, stroke_dx, stroke_dy, delta_x_y_between_strokes, num_strokes, end_look_pose = _compute_wipe_params_from_bbox(rgbd, bbox, clearance=z_offset, spacing_m=0.05, max_stroke_len=0.35) + wipe_start_pose, stroke_dx, stroke_dy, delta_x_y_between_strokes, num_strokes, end_look_pose = _compute_wipe_params_from_bbox( + rgbd, + bbox, + clearance=z_offset, + spacing_m=0.05, + max_stroke_len=0.35, + ) # Visualize the wipe surface in BODY frame: corners, mesh, and stroke paths def _as_np_pose(p): @@ -752,6 +775,12 @@ def main() -> None: default=0.00, help="Hand Z offset above surface in meters (clearance).", ) + parser.add_argument( + "--expand_percentage", + type=float, + default=0.0, + help="Fraction to expand bbox in image space (e.g., 0.2 for +20%).", + ) args = parser.parse_args() robot, lease_client, lease_keepalive, localizer = init_robot(args.hostname, args.map_name) # rr.init("wipe_online", spawn=True) @@ -760,7 +789,15 @@ def main() -> None: I have an image with some text written on it, and I am interested in finding a bounding box for it. Can you give me the coordinates of the bounding box that encloses the written text? The answer should follow the json format: {"bbox": [ymin, xmin, ymax, xmax], "label": "spill"}. The coordinates are in [ymin, xmin, ymax, xmax] format normalized to 0-1000.""" - wipe_online(robot, lease_client, lease_keepalive, localizer, vlm_query_template, args.z_offset) + wipe_online( + robot, + lease_client, + lease_keepalive, + localizer, + vlm_query_template, + args.z_offset, + args.expand_percentage, + ) if __name__ == "__main__": main() \ No newline at end of file