|
| 1 | +import os |
| 2 | +import time |
| 3 | +import webbrowser |
| 4 | + |
| 5 | +import meshcat.transformations as tf |
| 6 | +import numpy as np |
| 7 | +import placo |
| 8 | +import rospy |
| 9 | +from placo_utils.visualization import frame_viz, robot_frame_viz, robot_viz |
| 10 | + |
| 11 | +from xrobotoolkit_teleop.hardware.galaxea import A1XController |
| 12 | +from xrobotoolkit_teleop.utils.geometry import ( |
| 13 | + R_HEADSET_TO_WORLD, |
| 14 | + apply_delta_pose, |
| 15 | + quat_diff_as_angle_axis, |
| 16 | +) |
| 17 | +from xrobotoolkit_teleop.utils.gripper_utils import calc_parallel_gripper_position |
| 18 | +from xrobotoolkit_teleop.utils.path_utils import ASSET_PATH |
| 19 | +from xrobotoolkit_teleop.utils.xr_client import XrClient |
| 20 | + |
| 21 | +# Default paths and configurations for a single left arm |
| 22 | +DEFAULT_SINGLE_A1X_URDF_PATH = os.path.join(ASSET_PATH, "galaxea/A1X/a1x.urdf") |
| 23 | +DEFAULT_SCALE_FACTOR = 1.0 |
| 24 | +CONTROLLER_DEADZONE = 0.1 |
| 25 | + |
| 26 | +# Default end-effector configuration for a single left arm without a gripper |
| 27 | +DEFAULT_END_EFFECTOR_CONFIG = { |
| 28 | + "left_arm": { |
| 29 | + "link_name": "gripper_link", # URDF link name for the single arm |
| 30 | + "pose_source": "left_controller", |
| 31 | + "control_trigger": "left_grip", |
| 32 | + "gripper_config": { |
| 33 | + "joint_name": "left_gripper_finger_joint1", |
| 34 | + "gripper_trigger": "left_trigger", |
| 35 | + "open_pos": -2.0, |
| 36 | + "close_pos": 0.0, |
| 37 | + }, |
| 38 | + }, |
| 39 | +} |
| 40 | + |
| 41 | + |
| 42 | +class GalaxeaTeleopControllerSingleArm: |
| 43 | + def __init__( |
| 44 | + self, |
| 45 | + xr_client: XrClient, |
| 46 | + robot_urdf_path: str = DEFAULT_SINGLE_A1X_URDF_PATH, |
| 47 | + end_effector_config: dict = DEFAULT_END_EFFECTOR_CONFIG, |
| 48 | + R_headset_world: np.ndarray = R_HEADSET_TO_WORLD, |
| 49 | + scale_factor: float = DEFAULT_SCALE_FACTOR, |
| 50 | + visualize_placo: bool = True, |
| 51 | + ros_rate_hz: int = 100, |
| 52 | + ): |
| 53 | + # rospy.init_node("galaxea_teleop_controller_single", anonymous=True) |
| 54 | + |
| 55 | + self.xr_client = xr_client |
| 56 | + self.robot_urdf_path = robot_urdf_path |
| 57 | + self.R_headset_world = R_headset_world |
| 58 | + self.scale_factor = scale_factor |
| 59 | + self.visualize_placo = visualize_placo |
| 60 | + self.end_effector_config = end_effector_config |
| 61 | + # self.rate = rospy.Rate(ros_rate_hz) |
| 62 | + |
| 63 | + # Define joint names for the single left arm (no gripper) |
| 64 | + # left_joint_names = [f"arm_joint_{i}" for i in range(1, 7)] |
| 65 | + |
| 66 | + # Instantiate hardware controller for the left arm |
| 67 | + # Assuming the single arm uses the "left" prefix for its topics |
| 68 | + self.left_controller = A1XController() |
| 69 | + |
| 70 | + # Placo Setup |
| 71 | + self.placo_robot = placo.RobotWrapper(self.robot_urdf_path) |
| 72 | + self.solver = placo.KinematicsSolver(self.placo_robot) |
| 73 | + self.solver.dt = 1.0 / ros_rate_hz |
| 74 | + self.solver.mask_fbase(True) |
| 75 | + self.solver.add_kinetic_energy_regularization_task(1e-6) |
| 76 | + |
| 77 | + # Setup Placo tasks for the single arm |
| 78 | + self.effector_task = {} |
| 79 | + self.init_ee_xyz = {} |
| 80 | + self.init_ee_quat = {} |
| 81 | + self.init_controller_xyz = {} |
| 82 | + self.init_controller_quat = {} |
| 83 | + for name, config in self.end_effector_config.items(): |
| 84 | + self.effector_task[name] = self.solver.add_frame_task(config["link_name"], np.eye(4)) |
| 85 | + self.effector_task[name].configure(f"{name}_frame", "soft", 1.0) |
| 86 | + self.solver.add_manipulability_task(config["link_name"], "both", 1.0).configure( |
| 87 | + f"{name}_manipulability", "soft", 5e-2 |
| 88 | + ) |
| 89 | + self.init_ee_xyz[name] = None |
| 90 | + self.init_ee_quat[name] = None |
| 91 | + self.init_controller_xyz[name] = None |
| 92 | + self.init_controller_quat[name] = None |
| 93 | + |
| 94 | + self.gripper_pos_target = {} |
| 95 | + for name, config in end_effector_config.items(): |
| 96 | + if "gripper_config" in config: |
| 97 | + gripper_config = config["gripper_config"] |
| 98 | + self.gripper_pos_target[gripper_config["joint_name"]] = gripper_config["open_pos"] |
| 99 | + |
| 100 | + # Wait for the first joint state message to initialize Placo state |
| 101 | + print("Waiting for initial joint state from the Galaxea arm...") |
| 102 | + while not rospy.is_shutdown() and self.left_controller.timestamp == 0: |
| 103 | + rospy.sleep(0.1) |
| 104 | + print("Initial joint state received.") |
| 105 | + |
| 106 | + # Set initial Placo state from hardware. Assumes single arm URDF has 7 DoF base + 6 DoF arm |
| 107 | + self.placo_robot.state.q[7:13] = self.left_controller.qpos |
| 108 | + |
| 109 | + if self.visualize_placo: |
| 110 | + self.placo_robot.update_kinematics() |
| 111 | + self.placo_vis = robot_viz(self.placo_robot) |
| 112 | + time.sleep(0.5) |
| 113 | + webbrowser.open(self.placo_vis.viewer.url()) |
| 114 | + self.placo_vis.display(self.placo_robot.state.q) |
| 115 | + for name, config in self.end_effector_config.items(): |
| 116 | + robot_frame_viz(self.placo_robot, config["link_name"]) |
| 117 | + frame_viz(f"vis_target_{name}", self.effector_task[name].T_world_frame) |
| 118 | + |
| 119 | + def _process_xr_pose(self, xr_pose, arm_name: str): |
| 120 | + controller_xyz = np.array([xr_pose[0], xr_pose[1], xr_pose[2]]) |
| 121 | + controller_quat = np.array([xr_pose[6], xr_pose[3], xr_pose[4], xr_pose[5]]) # w, x, y, z |
| 122 | + |
| 123 | + controller_xyz = self.R_headset_world @ controller_xyz |
| 124 | + R_transform = np.eye(4) |
| 125 | + R_transform[:3, :3] = self.R_headset_world |
| 126 | + R_quat = tf.quaternion_from_matrix(R_transform) |
| 127 | + controller_quat = tf.quaternion_multiply( |
| 128 | + tf.quaternion_multiply(R_quat, controller_quat), tf.quaternion_conjugate(R_quat) |
| 129 | + ) |
| 130 | + |
| 131 | + if self.init_controller_xyz[arm_name] is None: |
| 132 | + self.init_controller_xyz[arm_name] = controller_xyz.copy() |
| 133 | + self.init_controller_quat[arm_name] = controller_quat.copy() |
| 134 | + delta_xyz = np.zeros(3) |
| 135 | + delta_rot = np.array([0.0, 0.0, 0.0]) |
| 136 | + else: |
| 137 | + delta_xyz = (controller_xyz - self.init_controller_xyz[arm_name]) * self.scale_factor |
| 138 | + delta_rot = quat_diff_as_angle_axis(self.init_controller_quat[arm_name], controller_quat) |
| 139 | + return delta_xyz, delta_rot |
| 140 | + |
| 141 | + def update_ik_and_publish(self): |
| 142 | + # Update placo state with the latest from hardware |
| 143 | + self.placo_robot.state.q[7:13] = self.left_controller.qpos |
| 144 | + self.placo_robot.update_kinematics() |
| 145 | + |
| 146 | + # Process XR inputs and update IK task (only one loop for the single arm) |
| 147 | + for arm_name, config in self.end_effector_config.items(): |
| 148 | + xr_grip_val = self.xr_client.get_key_value_by_name(config["control_trigger"]) |
| 149 | + active = xr_grip_val > (1.0 - CONTROLLER_DEADZONE) |
| 150 | + |
| 151 | + if active: |
| 152 | + if self.init_ee_xyz[arm_name] is None: |
| 153 | + self.init_ee_xyz[arm_name] = self.placo_robot.get_T_world_frame(config["link_name"])[:3, 3] |
| 154 | + self.init_ee_quat[arm_name] = tf.quaternion_from_matrix( |
| 155 | + self.placo_robot.get_T_world_frame(config["link_name"]) |
| 156 | + ) |
| 157 | + print(f"{arm_name} activated.") |
| 158 | + |
| 159 | + xr_pose = self.xr_client.get_pose_by_name(config["pose_source"]) |
| 160 | + delta_xyz, delta_rot = self._process_xr_pose(xr_pose, arm_name) |
| 161 | + target_xyz, target_quat = apply_delta_pose( |
| 162 | + self.init_ee_xyz[arm_name], self.init_ee_quat[arm_name], delta_xyz, delta_rot |
| 163 | + ) |
| 164 | + target_pose = tf.quaternion_matrix(target_quat) |
| 165 | + target_pose[:3, 3] = target_xyz |
| 166 | + self.effector_task[arm_name].T_world_frame = target_pose |
| 167 | + |
| 168 | + # Solve IK |
| 169 | + try: |
| 170 | + self.solver.solve(True) |
| 171 | + # Update target joint positions from IK solution |
| 172 | + self.left_controller.q_des = self.placo_robot.state.q[7:13].copy() |
| 173 | + |
| 174 | + if self.visualize_placo: |
| 175 | + self.placo_vis.display(self.placo_robot.state.q) |
| 176 | + for name in self.end_effector_config: |
| 177 | + frame_viz(f"vis_target_{name}", self.effector_task[name].T_world_frame) |
| 178 | + except RuntimeError as e: |
| 179 | + print(f"IK solver failed: {e}") |
| 180 | + except Exception as e: |
| 181 | + print(f"An unexpected error occurred in IK: {e}") |
| 182 | + |
| 183 | + else: |
| 184 | + if self.init_ee_xyz[arm_name] is not None: |
| 185 | + self.init_ee_xyz[arm_name] = None |
| 186 | + self.init_controller_xyz[arm_name] = None |
| 187 | + print(f"{arm_name} deactivated.") |
| 188 | + # Keep task target at current robot pose when not active |
| 189 | + self.effector_task[arm_name].T_world_frame = self.placo_robot.get_T_world_frame(config["link_name"]) |
| 190 | + # Update gripper position based on XR input |
| 191 | + if "gripper_config" in config: |
| 192 | + gripper_config = config["gripper_config"] |
| 193 | + trigger_value = self.xr_client.get_key_value_by_name(gripper_config["gripper_trigger"]) |
| 194 | + gripper_pos = calc_parallel_gripper_position( |
| 195 | + gripper_config["open_pos"], |
| 196 | + gripper_config["close_pos"], |
| 197 | + trigger_value, |
| 198 | + ) |
| 199 | + joint_name = gripper_config["joint_name"] |
| 200 | + self.gripper_pos_target[joint_name] = [gripper_pos] |
| 201 | + |
| 202 | + # if abs(self.gripper_pos_target["left_gripper_finger_joint1"][0] - self.left_controller.qpos_gripper[0]) > 0.01: |
| 203 | + self.left_controller.q_des_gripper = self.gripper_pos_target["left_gripper_finger_joint1"] |
| 204 | + self.left_controller.publish_gripper_control() |
| 205 | + # Publish commands to hardware |
| 206 | + self.left_controller.publish_arm_control() |
| 207 | + |
| 208 | + def run(self): |
| 209 | + print("Starting Galaxea A1X single arm teleop controller loop...") |
| 210 | + while not rospy.is_shutdown(): |
| 211 | + self.update_ik_and_publish() |
| 212 | + self.left_controller.rate.sleep() |
| 213 | + print("Galaxea teleop controller shutting down.") |
| 214 | + |
| 215 | + |
| 216 | +if __name__ == "__main__": |
| 217 | + # This is an example of how to run the controller |
| 218 | + # You would need to have your XR client and ROS environment set up |
| 219 | + try: |
| 220 | + xr_client = XrClient() |
| 221 | + controller = GalaxeaTeleopControllerSingleArm(xr_client=xr_client) |
| 222 | + controller.run() |
| 223 | + except rospy.ROSInterruptException: |
| 224 | + print("ROS node interrupted.") |
| 225 | + except Exception as e: |
| 226 | + print(f"An error occurred: {e}") |
0 commit comments