diff --git a/examples/openarms/gravity_compensation_one_arm.py b/examples/openarms/gravity_compensation_one_arm.py new file mode 100755 index 0000000000..dfdca19f9b --- /dev/null +++ b/examples/openarms/gravity_compensation_one_arm.py @@ -0,0 +1,157 @@ +import time +import numpy as np +import pinocchio as pin +from os.path import join, dirname, exists, expanduser + +from lerobot.robots.openarms.openarms_follower import OpenArmsFollower +from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig + + +def main() -> None: + config = OpenArmsFollowerConfig( + port_right="can0", + port_left="can1", + can_interface="socketcan", + id="openarms_follower", + disable_torque_on_disconnect=True, + max_relative_target=5.0, + ) + + + print("Initializing robot...") + follower = OpenArmsFollower(config) + follower.connect(calibrate=True) + + # Load URDF for Pinocchio dynamics + urdf_path = "/home/croissant/Documents/openarm_description/openarm_bimanual_pybullet.urdf" + + pin_robot = pin.RobotWrapper.BuildFromURDF(urdf_path, dirname(urdf_path)) + pin_robot.data = pin_robot.model.createData() + print(f"✓ Loaded Pinocchio model with {pin_robot.nq} DoFs") + + follower.pin_robot = pin_robot + + control_arm = "right" + + print(f"Applying gravity compensation to: {control_arm.upper()} ARM (all joints)") + print("\nAll joints on the right arm will have gravity compensation applied.") + print("Left arm joints will be disabled (free to move).") + print("\nIMPORTANT:") + print(" 1. Support the arm before starting") + print(" 2. The arm will be held in place by gravity compensation") + print(" 3. You should be able to move it with gentle force") + print("\nPress ENTER when ready to start...") + input() + + follower.bus_left.enable_torque() + time.sleep(0.1) + + print(f"✓ Motors enabled") + print(f" {control_arm.upper()} arm: Gravity compensation active") + print(f" LEFT arm: Zero torque (free to move)") + + print("\nStarting gravity compensation loop...") + print("Press Ctrl+C to stop\n") + + loop_times = [] + last_print_time = time.perf_counter() + + try: + while True: + loop_start = time.perf_counter() + + # Get current joint positions from robot + obs = follower.get_observation() + + # Extract positions in degrees + positions_deg = {} + for motor in follower.bus_right.motors: + key = f"right_{motor}.pos" + if key in obs: + positions_deg[f"right_{motor}"] = obs[key] + + for motor in follower.bus_left.motors: + key = f"left_{motor}.pos" + if key in obs: + positions_deg[f"left_{motor}"] = obs[key] + + # Convert to radians and calculate gravity torques + # Use the built-in method from OpenArmsFollower + positions_rad = {k: np.deg2rad(v) for k, v in positions_deg.items()} + torques_nm = follower._gravity_from_q(positions_rad) + + # Apply gravity compensation torques to all joints on the right arm + for motor in follower.bus_right.motors: + full_name = f"right_{motor}" + position = positions_deg.get(full_name, 0.0) + + # Apply gravity compensation to this joint + torque = torques_nm.get(full_name, 0.0) + + # Send MIT control command with gravity compensation torque + follower.bus_right._mit_control( + motor=motor, + kp=0.0, # No position control + kd=0.0, # No velocity damping + position_degrees=position, + velocity_deg_per_sec=0.0, + torque=torque + ) + + # Also ensure left arm joints have zero torque + for motor in follower.bus_left.motors: + full_name = f"left_{motor}" + position = positions_deg.get(full_name, 0.0) + + follower.bus_left._mit_control( + motor=motor, + kp=0.0, + kd=0.0, + position_degrees=position, + velocity_deg_per_sec=0.0, + torque=0.0 + ) + + # Measure loop time + loop_end = time.perf_counter() + loop_time = loop_end - loop_start + loop_times.append(loop_time) + + # Print status every 2 seconds + if loop_end - last_print_time >= 2.0: + if loop_times: + avg_time = sum(loop_times) / len(loop_times) + current_hz = 1.0 / avg_time if avg_time > 0 else 0 + + # Display status for all joints on the controlled arm + print(f"\n[RIGHT ARM] Loop: {current_hz:.1f} Hz ({avg_time*1000:.1f} ms)") + + # Show each joint's position and torque + for motor in follower.bus_right.motors: + full_name = f"right_{motor}" + pos = positions_deg.get(full_name, 0.0) + torque = torques_nm.get(full_name, 0.0) + print(f" {motor:8s}: Pos={pos:7.1f}° | Torque={torque:7.3f} N·m") + + # Reset for next measurement window + loop_times = [] + last_print_time = loop_end + + # Small sleep to avoid overwhelming the CAN bus + time.sleep(0.01) # 100 Hz + + except KeyboardInterrupt: + print("\n\nStopping gravity compensation...") + + finally: + print("\nDisabling all motors and disconnecting...") + follower.bus_right.disable_torque() + follower.bus_left.disable_torque() + time.sleep(0.1) + follower.disconnect() + print("✓ Safe shutdown complete") + + +if __name__ == "__main__": + main() + diff --git a/examples/openarms/teleop_with_gravity_compensation.py b/examples/openarms/teleop_with_gravity_compensation.py new file mode 100755 index 0000000000..a260f1a2af --- /dev/null +++ b/examples/openarms/teleop_with_gravity_compensation.py @@ -0,0 +1,232 @@ +""" +OpenArms Teleoperation with Gravity Compensation + +Leader RIGHT arm: Gravity compensation (weightless, easy to move) +Follower RIGHT arm: Mirrors leader movements +Both LEFT arms: Free to move (disabled) + +The urdf file tested with this script is found in: +https://github.com/michel-aractingi/openarm_description/blob/main/openarm_bimanual_pybullet.urdf +""" + +import time +import os + +import numpy as np +import pinocchio as pin + +from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig +from lerobot.robots.openarms.openarms_follower import OpenArmsFollower +from lerobot.teleoperators.openarms.config_openarms_leader import OpenArmsLeaderConfig +from lerobot.teleoperators.openarms.openarms_leader import OpenArmsLeader + +# Path to the URDF file +URDF_PATH = "/home/croissant/Documents/openarm_description/openarm_bimanual_pybullet.urdf" + +def compute_gravity_torques(leader, positions_rad): + """ + Compute gravity torques for all joints in the robot. + + Args: + leader: OpenArmsLeader instance with pin_robot set + positions_rad: Dictionary mapping motor names (with arm prefix) to positions in radians + + Returns: + Dictionary mapping motor names to gravity torques in N·m + """ + if not hasattr(leader, "pin_robot") or leader.pin_robot is None: + raise RuntimeError("URDF model not loaded on leader") + + # Build position vector in the order of motors (right arm, then left arm) + q = np.zeros(leader.pin_robot.model.nq) + idx = 0 + + # Right arm motors + for motor_name in leader.bus_right.motors: + full_name = f"right_{motor_name}" + q[idx] = positions_rad.get(full_name, 0.0) + idx += 1 + + # Left arm motors + for motor_name in leader.bus_left.motors: + full_name = f"left_{motor_name}" + q[idx] = positions_rad.get(full_name, 0.0) + idx += 1 + + # Compute generalized gravity vector + g = pin.computeGeneralizedGravity(leader.pin_robot.model, leader.pin_robot.data, q) + + # Map back to motor names + result = {} + idx = 0 + for motor_name in leader.bus_right.motors: + result[f"right_{motor_name}"] = float(g[idx]) + idx += 1 + for motor_name in leader.bus_left.motors: + result[f"left_{motor_name}"] = float(g[idx]) + idx += 1 + + return result + + +def main(): + """Main teleoperation loop with gravity compensation""" + + print("=" * 70) + print("OpenArms Teleoperation with Gravity Compensation") + print("=" * 70) + + # Configuration + follower_config = OpenArmsFollowerConfig( + port_right="can0", + port_left="can1", + can_interface="socketcan", + id="openarms_follower", + disable_torque_on_disconnect=True, + max_relative_target=10.0, + ) + + leader_config = OpenArmsLeaderConfig( + port_right="can2", + port_left="can3", + can_interface="socketcan", + id="openarms_leader", + manual_control=False, # Enable torque control for gravity compensation + ) + + # Initialize and connect + print("\nInitializing devices...") + follower = OpenArmsFollower(follower_config) + leader = OpenArmsLeader(leader_config) + + follower.connect(calibrate=True) + leader.connect(calibrate=True) + + # Load URDF for gravity compensation + if not os.path.exists(URDF_PATH): + raise FileNotFoundError(f"URDF file not found at {URDF_PATH}") + pin_robot = pin.RobotWrapper.BuildFromURDF(URDF_PATH, os.path.dirname(URDF_PATH)) + pin_robot.data = pin_robot.model.createData() + leader.pin_robot = pin_robot + + print("\nLeader RIGHT: G-comp | Follower RIGHT: Teleop") + print("Press ENTER to start...") + input() + + # Enable motors + leader.bus_right.enable_torque() + leader.bus_left.enable_torque() + time.sleep(0.1) + + print("Press Ctrl+C to stop\n") + + # Main control loop + loop_times = [] + last_print_time = time.perf_counter() + + # Right arm joints only + right_joints = [ + "right_joint_1", + "right_joint_2", + "right_joint_3", + "right_joint_4", + "right_joint_5", + "right_joint_6", + "right_joint_7", + "right_gripper", + ] + + try: + while True: + loop_start = time.perf_counter() + + # Get leader state + leader_action = leader.get_action() + + leader_positions_deg = {} + for motor in leader.bus_right.motors: + key = f"right_{motor}.pos" + if key in leader_action: + leader_positions_deg[f"right_{motor}"] = leader_action[key] + + for motor in leader.bus_left.motors: + key = f"left_{motor}.pos" + if key in leader_action: + leader_positions_deg[f"left_{motor}"] = leader_action[key] + + # Calculate gravity torques for leader + leader_positions_rad = {k: np.deg2rad(v) for k, v in leader_positions_deg.items()} + leader_torques_nm = compute_gravity_torques(leader, leader_positions_rad) + + # Apply gravity compensation to leader right arm + for motor in leader.bus_right.motors: + full_name = f"right_{motor}" + position = leader_positions_deg.get(full_name, 0.0) + torque = leader_torques_nm.get(full_name, 0.0) + + leader.bus_right._mit_control( + motor=motor, + kp=0.0, + kd=0.0, + position_degrees=position, + velocity_deg_per_sec=0.0, + torque=torque, + ) + + # Keep leader left arm free + for motor in leader.bus_left.motors: + full_name = f"left_{motor}" + position = leader_positions_deg.get(full_name, 0.0) + + leader.bus_left._mit_control( + motor=motor, + kp=0.0, + kd=0.0, + position_degrees=position, + velocity_deg_per_sec=0.0, + torque=0.0, + ) + + # Send leader positions to follower right arm + follower_action = {} + for joint in right_joints: + pos_key = f"{joint}.pos" + if pos_key in leader_action: + follower_action[pos_key] = leader_action[pos_key] + + if follower_action: + follower.send_action(follower_action) + + # Performance monitoring + loop_end = time.perf_counter() + loop_time = loop_end - loop_start + loop_times.append(loop_time) + + if loop_end - last_print_time >= 2.0: + if loop_times: + avg_time = sum(loop_times) / len(loop_times) + current_hz = 1.0 / avg_time if avg_time > 0 else 0 + sample_pos = leader_positions_deg.get("right_joint_2", 0.0) + sample_torque = leader_torques_nm.get("right_joint_2", 0.0) + + print(f"[{current_hz:.1f} Hz] J2: {sample_pos:5.1f}° | Torque: {sample_torque:5.2f} N·m") + + loop_times = [] + last_print_time = loop_end + + except KeyboardInterrupt: + print("\n\nStopping...") + finally: + try: + leader.bus_right.disable_torque() + leader.bus_left.disable_torque() + time.sleep(0.1) + leader.disconnect() + follower.disconnect() + print("✓ Shutdown complete") + except Exception as e: + print(f"Shutdown error: {e}") + + +if __name__ == "__main__": + main()