11import os
2+ import threading
23import time
34import webbrowser
45
1819from xrobotoolkit_teleop .utils .path_utils import ASSET_PATH
1920from xrobotoolkit_teleop .utils .xr_client import XrClient
2021
21- # Default paths and configurations for a single left arm
22+ # Default paths and configurations for a single right arm
2223DEFAULT_SINGLE_A1X_URDF_PATH = os .path .join (ASSET_PATH , "galaxea/A1X/a1x.urdf" )
2324DEFAULT_SCALE_FACTOR = 1.0
2425CONTROLLER_DEADZONE = 0.1
2526
26- # Default end-effector configuration for a single left arm without a gripper
27+ # Default end-effector configuration for a single right arm without a gripper
2728DEFAULT_END_EFFECTOR_CONFIG = {
28- "left_arm " : {
29+ "right_arm " : {
2930 "link_name" : "gripper_link" , # URDF link name for the single arm
30- "pose_source" : "left_controller " ,
31- "control_trigger" : "left_grip " ,
31+ "pose_source" : "right_controller " ,
32+ "control_trigger" : "right_grip " ,
3233 "gripper_config" : {
33- "joint_name" : "left_gripper_finger_joint1 " ,
34- "gripper_trigger" : "left_trigger " ,
34+ "joint_name" : "right_gripper_finger_joint1 " ,
35+ "gripper_trigger" : "right_trigger " ,
3536 "open_pos" : - 2.0 ,
3637 "close_pos" : 0.0 ,
3738 },
3839 },
3940}
4041
4142
42- class GalaxeaTeleopControllerSingleArm :
43+ class GalaxeaA1XTeleopController :
4344 def __init__ (
4445 self ,
4546 xr_client : XrClient ,
@@ -50,31 +51,22 @@ def __init__(
5051 visualize_placo : bool = True ,
5152 ros_rate_hz : int = 100 ,
5253 ):
53- # rospy.init_node("galaxea_teleop_controller_single", anonymous=True)
5454
5555 self .xr_client = xr_client
5656 self .robot_urdf_path = robot_urdf_path
5757 self .R_headset_world = R_headset_world
5858 self .scale_factor = scale_factor
5959 self .visualize_placo = visualize_placo
6060 self .end_effector_config = end_effector_config
61- # self.rate = rospy.Rate(ros_rate_hz)
6261
63- # Define joint names for the single left arm (no gripper)
64- # left_joint_names = [f"arm_joint_{i}" for i in range(1, 7)]
62+ self .right_controller = A1XController ()
6563
66- # Instantiate hardware controller for the left arm
67- # Assuming the single arm uses the "left" prefix for its topics
68- self .left_controller = A1XController ()
69-
70- # Placo Setup
7164 self .placo_robot = placo .RobotWrapper (self .robot_urdf_path )
7265 self .solver = placo .KinematicsSolver (self .placo_robot )
7366 self .solver .dt = 1.0 / ros_rate_hz
7467 self .solver .mask_fbase (True )
7568 self .solver .add_kinetic_energy_regularization_task (1e-6 )
7669
77- # Setup Placo tasks for the single arm
7870 self .effector_task = {}
7971 self .init_ee_xyz = {}
8072 self .init_ee_quat = {}
@@ -97,14 +89,12 @@ def __init__(
9789 gripper_config = config ["gripper_config" ]
9890 self .gripper_pos_target [gripper_config ["joint_name" ]] = gripper_config ["open_pos" ]
9991
100- # Wait for the first joint state message to initialize Placo state
10192 print ("Waiting for initial joint state from the Galaxea arm..." )
102- while not rospy .is_shutdown () and self .left_controller .timestamp == 0 :
93+ while not rospy .is_shutdown () and self .right_controller .timestamp == 0 :
10394 rospy .sleep (0.1 )
10495 print ("Initial joint state received." )
10596
106- # Set initial Placo state from hardware. Assumes single arm URDF has 7 DoF base + 6 DoF arm
107- self .placo_robot .state .q [7 :13 ] = self .left_controller .qpos
97+ self .placo_robot .state .q [7 :13 ] = self .right_controller .qpos
10898
10999 if self .visualize_placo :
110100 self .placo_robot .update_kinematics ()
@@ -118,7 +108,7 @@ def __init__(
118108
119109 def _process_xr_pose (self , xr_pose , arm_name : str ):
120110 controller_xyz = np .array ([xr_pose [0 ], xr_pose [1 ], xr_pose [2 ]])
121- controller_quat = np .array ([xr_pose [6 ], xr_pose [3 ], xr_pose [4 ], xr_pose [5 ]]) # w, x, y, z
111+ controller_quat = np .array ([xr_pose [6 ], xr_pose [3 ], xr_pose [4 ], xr_pose [5 ]])
122112
123113 controller_xyz = self .R_headset_world @ controller_xyz
124114 R_transform = np .eye (4 )
@@ -138,12 +128,10 @@ def _process_xr_pose(self, xr_pose, arm_name: str):
138128 delta_rot = quat_diff_as_angle_axis (self .init_controller_quat [arm_name ], controller_quat )
139129 return delta_xyz , delta_rot
140130
141- def update_ik_and_publish (self ):
142- # Update placo state with the latest from hardware
143- self .placo_robot .state .q [7 :13 ] = self .left_controller .qpos
131+ def update_ik (self ):
132+ self .placo_robot .state .q [7 :13 ] = self .right_controller .qpos
144133 self .placo_robot .update_kinematics ()
145134
146- # Process XR inputs and update IK task (only one loop for the single arm)
147135 for arm_name , config in self .end_effector_config .items ():
148136 xr_grip_val = self .xr_client .get_key_value_by_name (config ["control_trigger" ])
149137 active = xr_grip_val > (1.0 - CONTROLLER_DEADZONE )
@@ -165,11 +153,9 @@ def update_ik_and_publish(self):
165153 target_pose [:3 , 3 ] = target_xyz
166154 self .effector_task [arm_name ].T_world_frame = target_pose
167155
168- # Solve IK
169156 try :
170157 self .solver .solve (True )
171- # Update target joint positions from IK solution
172- self .left_controller .q_des = self .placo_robot .state .q [7 :13 ].copy ()
158+ self .right_controller .q_des = self .placo_robot .state .q [7 :13 ].copy ()
173159
174160 if self .visualize_placo :
175161 self .placo_vis .display (self .placo_robot .state .q )
@@ -185,8 +171,8 @@ def update_ik_and_publish(self):
185171 self .init_ee_xyz [arm_name ] = None
186172 self .init_controller_xyz [arm_name ] = None
187173 print (f"{ arm_name } deactivated." )
188- # Keep task target at current robot pose when not active
189174 self .effector_task [arm_name ].T_world_frame = self .placo_robot .get_T_world_frame (config ["link_name" ])
175+
190176 # Update gripper position based on XR input
191177 if "gripper_config" in config :
192178 gripper_config = config ["gripper_config" ]
@@ -199,28 +185,23 @@ def update_ik_and_publish(self):
199185 joint_name = gripper_config ["joint_name" ]
200186 self .gripper_pos_target [joint_name ] = [gripper_pos ]
201187
202- # if abs(self.gripper_pos_target["left_gripper_finger_joint1"][0] - self.left_controller.qpos_gripper[0]) > 0.01:
203- self .left_controller .q_des_gripper = self .gripper_pos_target ["left_gripper_finger_joint1" ]
204- self .left_controller .publish_gripper_control ()
205- # Publish commands to hardware
206- self .left_controller .publish_arm_control ()
188+ self .right_controller .q_des_gripper = self .gripper_pos_target [
189+ self .end_effector_config ["right_arm" ]["gripper_config" ]["joint_name" ]
190+ ]
191+
192+ def run_ik_thread (self , stop_event : threading .Event ):
193+ print ("Starting Galaxea A1X single arm teleop controller IK loop..." )
194+ while not (rospy .is_shutdown () or stop_event .is_set ()):
195+ start = time .time ()
196+ self .update_ik ()
197+ elapsed = time .time () - start
198+ time .sleep (max (0 , (self .solver .dt - elapsed )))
199+ print ("Galaxea teleop controller IK loop shutting down." )
207200
208- def run (self ):
201+ def run_control_thread (self , stop_event : threading . Event ):
209202 print ("Starting Galaxea A1X single arm teleop controller loop..." )
210- while not rospy .is_shutdown ():
211- self .update_ik_and_publish ()
212- self .left_controller .rate .sleep ()
203+ while not (rospy .is_shutdown () or stop_event .is_set ()):
204+ self .right_controller .publish_arm_control ()
205+ self .right_controller .publish_gripper_control ()
206+ self .right_controller .rate .sleep ()
213207 print ("Galaxea teleop controller shutting down." )
214-
215-
216- if __name__ == "__main__" :
217- # This is an example of how to run the controller
218- # You would need to have your XR client and ROS environment set up
219- try :
220- xr_client = XrClient ()
221- controller = GalaxeaTeleopControllerSingleArm (xr_client = xr_client )
222- controller .run ()
223- except rospy .ROSInterruptException :
224- print ("ROS node interrupted." )
225- except Exception as e :
226- print (f"An error occurred: { e } " )
0 commit comments