Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions examples/openarms/gravity_compensation_one_arm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import time
import numpy as np
import pinocchio as pin
from os.path import join, dirname, exists, expanduser

from lerobot.robots.openarms.openarms_follower import OpenArmsFollower
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig


def main() -> None:
config = OpenArmsFollowerConfig(
port_right="can0",
port_left="can1",
can_interface="socketcan",
id="openarms_follower",
disable_torque_on_disconnect=True,
max_relative_target=5.0,
)


print("Initializing robot...")
follower = OpenArmsFollower(config)
follower.connect(calibrate=True)

# Load URDF for Pinocchio dynamics
urdf_path = "/home/croissant/Documents/openarm_description/openarm_bimanual_pybullet.urdf"

pin_robot = pin.RobotWrapper.BuildFromURDF(urdf_path, dirname(urdf_path))
pin_robot.data = pin_robot.model.createData()
print(f"✓ Loaded Pinocchio model with {pin_robot.nq} DoFs")

follower.pin_robot = pin_robot

control_arm = "right"

print(f"Applying gravity compensation to: {control_arm.upper()} ARM (all joints)")
print("\nAll joints on the right arm will have gravity compensation applied.")
print("Left arm joints will be disabled (free to move).")
print("\nIMPORTANT:")
print(" 1. Support the arm before starting")
print(" 2. The arm will be held in place by gravity compensation")
print(" 3. You should be able to move it with gentle force")
print("\nPress ENTER when ready to start...")
input()

follower.bus_left.enable_torque()
time.sleep(0.1)

print(f"✓ Motors enabled")
print(f" {control_arm.upper()} arm: Gravity compensation active")
print(f" LEFT arm: Zero torque (free to move)")

print("\nStarting gravity compensation loop...")
print("Press Ctrl+C to stop\n")

loop_times = []
last_print_time = time.perf_counter()

try:
while True:
loop_start = time.perf_counter()

# Get current joint positions from robot
obs = follower.get_observation()

# Extract positions in degrees
positions_deg = {}
for motor in follower.bus_right.motors:
key = f"right_{motor}.pos"
if key in obs:
positions_deg[f"right_{motor}"] = obs[key]

for motor in follower.bus_left.motors:
key = f"left_{motor}.pos"
if key in obs:
positions_deg[f"left_{motor}"] = obs[key]

# Convert to radians and calculate gravity torques
# Use the built-in method from OpenArmsFollower
positions_rad = {k: np.deg2rad(v) for k, v in positions_deg.items()}
torques_nm = follower._gravity_from_q(positions_rad)

# Apply gravity compensation torques to all joints on the right arm
for motor in follower.bus_right.motors:
full_name = f"right_{motor}"
position = positions_deg.get(full_name, 0.0)

# Apply gravity compensation to this joint
torque = torques_nm.get(full_name, 0.0)

# Send MIT control command with gravity compensation torque
follower.bus_right._mit_control(
motor=motor,
kp=0.0, # No position control
kd=0.0, # No velocity damping
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=torque
)

# Also ensure left arm joints have zero torque
for motor in follower.bus_left.motors:
full_name = f"left_{motor}"
position = positions_deg.get(full_name, 0.0)

follower.bus_left._mit_control(
motor=motor,
kp=0.0,
kd=0.0,
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=0.0
)

# Measure loop time
loop_end = time.perf_counter()
loop_time = loop_end - loop_start
loop_times.append(loop_time)

# Print status every 2 seconds
if loop_end - last_print_time >= 2.0:
if loop_times:
avg_time = sum(loop_times) / len(loop_times)
current_hz = 1.0 / avg_time if avg_time > 0 else 0

# Display status for all joints on the controlled arm
print(f"\n[RIGHT ARM] Loop: {current_hz:.1f} Hz ({avg_time*1000:.1f} ms)")

# Show each joint's position and torque
for motor in follower.bus_right.motors:
full_name = f"right_{motor}"
pos = positions_deg.get(full_name, 0.0)
torque = torques_nm.get(full_name, 0.0)
print(f" {motor:8s}: Pos={pos:7.1f}° | Torque={torque:7.3f} N·m")

# Reset for next measurement window
loop_times = []
last_print_time = loop_end

# Small sleep to avoid overwhelming the CAN bus
time.sleep(0.01) # 100 Hz

except KeyboardInterrupt:
print("\n\nStopping gravity compensation...")

finally:
print("\nDisabling all motors and disconnecting...")
follower.bus_right.disable_torque()
follower.bus_left.disable_torque()
time.sleep(0.1)
follower.disconnect()
print("✓ Safe shutdown complete")


if __name__ == "__main__":
main()

232 changes: 232 additions & 0 deletions examples/openarms/teleop_with_gravity_compensation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
"""
OpenArms Teleoperation with Gravity Compensation

Leader RIGHT arm: Gravity compensation (weightless, easy to move)
Follower RIGHT arm: Mirrors leader movements
Both LEFT arms: Free to move (disabled)

The urdf file tested with this script is found in:
https://github.com/michel-aractingi/openarm_description/blob/main/openarm_bimanual_pybullet.urdf
"""

import time
import os

import numpy as np
import pinocchio as pin

from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
from lerobot.robots.openarms.openarms_follower import OpenArmsFollower
from lerobot.teleoperators.openarms.config_openarms_leader import OpenArmsLeaderConfig
from lerobot.teleoperators.openarms.openarms_leader import OpenArmsLeader

# Path to the URDF file
URDF_PATH = "/home/croissant/Documents/openarm_description/openarm_bimanual_pybullet.urdf"

def compute_gravity_torques(leader, positions_rad):
"""
Compute gravity torques for all joints in the robot.

Args:
leader: OpenArmsLeader instance with pin_robot set
positions_rad: Dictionary mapping motor names (with arm prefix) to positions in radians

Returns:
Dictionary mapping motor names to gravity torques in N·m
"""
if not hasattr(leader, "pin_robot") or leader.pin_robot is None:
raise RuntimeError("URDF model not loaded on leader")

# Build position vector in the order of motors (right arm, then left arm)
q = np.zeros(leader.pin_robot.model.nq)
idx = 0

# Right arm motors
for motor_name in leader.bus_right.motors:
full_name = f"right_{motor_name}"
q[idx] = positions_rad.get(full_name, 0.0)
idx += 1

# Left arm motors
for motor_name in leader.bus_left.motors:
full_name = f"left_{motor_name}"
q[idx] = positions_rad.get(full_name, 0.0)
idx += 1

# Compute generalized gravity vector
g = pin.computeGeneralizedGravity(leader.pin_robot.model, leader.pin_robot.data, q)

# Map back to motor names
result = {}
idx = 0
for motor_name in leader.bus_right.motors:
result[f"right_{motor_name}"] = float(g[idx])
idx += 1
for motor_name in leader.bus_left.motors:
result[f"left_{motor_name}"] = float(g[idx])
idx += 1

return result


def main():
"""Main teleoperation loop with gravity compensation"""

print("=" * 70)
print("OpenArms Teleoperation with Gravity Compensation")
print("=" * 70)

# Configuration
follower_config = OpenArmsFollowerConfig(
port_right="can0",
port_left="can1",
can_interface="socketcan",
id="openarms_follower",
disable_torque_on_disconnect=True,
max_relative_target=10.0,
)

leader_config = OpenArmsLeaderConfig(
port_right="can2",
port_left="can3",
can_interface="socketcan",
id="openarms_leader",
manual_control=False, # Enable torque control for gravity compensation
)

# Initialize and connect
print("\nInitializing devices...")
follower = OpenArmsFollower(follower_config)
leader = OpenArmsLeader(leader_config)

follower.connect(calibrate=True)
leader.connect(calibrate=True)

# Load URDF for gravity compensation
if not os.path.exists(URDF_PATH):
raise FileNotFoundError(f"URDF file not found at {URDF_PATH}")
pin_robot = pin.RobotWrapper.BuildFromURDF(URDF_PATH, os.path.dirname(URDF_PATH))
pin_robot.data = pin_robot.model.createData()
leader.pin_robot = pin_robot

print("\nLeader RIGHT: G-comp | Follower RIGHT: Teleop")
print("Press ENTER to start...")
input()

# Enable motors
leader.bus_right.enable_torque()
leader.bus_left.enable_torque()
time.sleep(0.1)

print("Press Ctrl+C to stop\n")

# Main control loop
loop_times = []
last_print_time = time.perf_counter()

# Right arm joints only
right_joints = [
"right_joint_1",
"right_joint_2",
"right_joint_3",
"right_joint_4",
"right_joint_5",
"right_joint_6",
"right_joint_7",
"right_gripper",
]

try:
while True:
loop_start = time.perf_counter()

# Get leader state
leader_action = leader.get_action()

leader_positions_deg = {}
for motor in leader.bus_right.motors:
key = f"right_{motor}.pos"
if key in leader_action:
leader_positions_deg[f"right_{motor}"] = leader_action[key]

for motor in leader.bus_left.motors:
key = f"left_{motor}.pos"
if key in leader_action:
leader_positions_deg[f"left_{motor}"] = leader_action[key]

# Calculate gravity torques for leader
leader_positions_rad = {k: np.deg2rad(v) for k, v in leader_positions_deg.items()}
leader_torques_nm = compute_gravity_torques(leader, leader_positions_rad)

# Apply gravity compensation to leader right arm
for motor in leader.bus_right.motors:
full_name = f"right_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
torque = leader_torques_nm.get(full_name, 0.0)

leader.bus_right._mit_control(
motor=motor,
kp=0.0,
kd=0.0,
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=torque,
)

# Keep leader left arm free
for motor in leader.bus_left.motors:
full_name = f"left_{motor}"
position = leader_positions_deg.get(full_name, 0.0)

leader.bus_left._mit_control(
motor=motor,
kp=0.0,
kd=0.0,
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=0.0,
)

# Send leader positions to follower right arm
follower_action = {}
for joint in right_joints:
pos_key = f"{joint}.pos"
if pos_key in leader_action:
follower_action[pos_key] = leader_action[pos_key]

if follower_action:
follower.send_action(follower_action)

# Performance monitoring
loop_end = time.perf_counter()
loop_time = loop_end - loop_start
loop_times.append(loop_time)

if loop_end - last_print_time >= 2.0:
if loop_times:
avg_time = sum(loop_times) / len(loop_times)
current_hz = 1.0 / avg_time if avg_time > 0 else 0
sample_pos = leader_positions_deg.get("right_joint_2", 0.0)
sample_torque = leader_torques_nm.get("right_joint_2", 0.0)

print(f"[{current_hz:.1f} Hz] J2: {sample_pos:5.1f}° | Torque: {sample_torque:5.2f} N·m")

loop_times = []
last_print_time = loop_end

except KeyboardInterrupt:
print("\n\nStopping...")
finally:
try:
leader.bus_right.disable_torque()
leader.bus_left.disable_torque()
time.sleep(0.1)
leader.disconnect()
follower.disconnect()
print("✓ Shutdown complete")
except Exception as e:
print(f"Shutdown error: {e}")


if __name__ == "__main__":
main()