diff --git a/robosuite/controllers/parts/controller.py b/robosuite/controllers/parts/controller.py index 2b3b4f58b7..6fe772d8de 100644 --- a/robosuite/controllers/parts/controller.py +++ b/robosuite/controllers/parts/controller.py @@ -122,6 +122,9 @@ def __init__( # Torques being outputted by the controller self.torques = None + # External torque compensation + self.external_torque_compensation = 0 + # Update flag to prevent redundant update calls self.new_update = True diff --git a/robosuite/controllers/parts/generic/joint_pos.py b/robosuite/controllers/parts/generic/joint_pos.py index 7665679157..42dc03b038 100644 --- a/robosuite/controllers/parts/generic/joint_pos.py +++ b/robosuite/controllers/parts/generic/joint_pos.py @@ -175,7 +175,9 @@ def __init__( # initialize self.goal_qpos = None + self.desired_torque_as_acceleration = kwargs.get("desired_torque_as_acceleration", True) self.use_torque_compensation = kwargs.get("use_torque_compensation", True) + self.use_external_torque_compensation = kwargs.get("use_external_torque_compensation", False) def set_goal(self, action, set_qpos=None): """ @@ -259,13 +261,19 @@ def run_controller(self): position_error = desired_qpos - self.joint_pos vel_pos_error = -self.joint_vel - desired_torque = np.multiply(np.array(position_error), np.array(self.kp)) + np.multiply(vel_pos_error, self.kd) + self.torques = np.multiply(np.array(position_error), np.array(self.kp)) + np.multiply(vel_pos_error, self.kd) - # Return desired torques plus gravity compensations + # The computed self.torques may represent acceleration rather than actual torques, + # depending on the meaning of PD gains. + # This usage is used to maintain compatibility with other controllers in robosuite. + if self.desired_torque_as_acceleration: + self.torques = np.dot(self.mass_matrix, self.torques) + + # Return desired torques plus gravity compensation if self.use_torque_compensation: - self.torques = np.dot(self.mass_matrix, desired_torque) + self.torque_compensation - else: - self.torques = desired_torque + self.torques += self.torque_compensation + elif self.use_external_torque_compensation: + self.torques += self.external_torque_compensation # Always run superclass call for any cleanups at the end super().run_controller() diff --git a/robosuite/environments/base.py b/robosuite/environments/base.py index c0e77b2918..0072961cb9 100644 --- a/robosuite/environments/base.py +++ b/robosuite/environments/base.py @@ -375,24 +375,30 @@ def unset_ep_meta(self): """ self._ep_meta = {} - def _update_observables(self, force=False): + def _update_observables(self, force=False, timestep=None): """ Updates all observables in this environment Args: force (bool): If True, will force all the observables to update their internal values to the newest value. This is useful if, e.g., you want to grab observations when directly setting simulation states without actually stepping the simulation. + timestep (float): If specified, will update the observables with the given timestep. Sometimes, we only + want to get the current time step of the observables without interrupt the observable update process. """ for observable in self._observables.values(): - observable.update(timestep=self.model_timestep, obs_cache=self._obs_cache, force=force) + observable.update( + timestep=self.model_timestep if timestep is None else timestep, obs_cache=self._obs_cache, force=force + ) - def _get_observations(self, force_update=False): + def _get_observations(self, force_update=False, timestep=None): """ Grabs observations from the environment. Args: force_update (bool): If True, will force all the observables to update their internal values to the newest value. This is useful if, e.g., you want to grab observations when directly setting simulation states without actually stepping the simulation. + timestep (float): If specified, will update the observables with the given timestep. Sometimes, we only + want to get the current time step of the observables without interrupt the observable update process. Returns: OrderedDict: OrderedDict containing observations [(name_string, np.array), ...] """ @@ -401,7 +407,7 @@ def _get_observations(self, force_update=False): # Force an update if requested if force_update: - self._update_observables(force=True) + self._update_observables(force=True, timestep=timestep) # Loop through all observables and grab their current observation for obs_name, observable in self._observables.items(): diff --git a/robosuite/robots/robot.py b/robosuite/robots/robot.py index 7ba93b0e10..665053d4cd 100644 --- a/robosuite/robots/robot.py +++ b/robosuite/robots/robot.py @@ -1,9 +1,9 @@ import copy import json -import os from collections import OrderedDict from typing import Optional +import mujoco import numpy as np import robosuite.utils.transform_utils as T @@ -373,6 +373,14 @@ def joint_acc(obs_cache): names += arm_sensor_names actives += [True] * len(arm_sensors) + if hasattr(self.robot_model, "torso_body"): + torso_sensors, torso_sensor_names = self._create_torso_sensors( + self.robot_model.torso_body, modality=modality + ) + sensors += torso_sensors + names += torso_sensor_names + actives += [True] * len(torso_sensors) + base_sensors, base_sensor_names = self._create_base_sensors(modality=modality) sensors += base_sensors names += base_sensor_names @@ -476,9 +484,48 @@ def gripper_qpos(obs_cache): def gripper_qvel(obs_cache): return np.array([self.sim.data.qvel[x] for x in self._ref_gripper_joint_vel_indexes[arm]]) - sensors += [gripper_qpos, gripper_qvel] - names += [f"{pf}gripper_qpos", f"{pf}gripper_qvel"] + @sensor(modality=modality) + def gripper_qacc(obs_cache): + return np.array([self.sim.data.qacc[x] for x in self._ref_gripper_joint_vel_indexes[arm]]) + + sensors += [gripper_qpos, gripper_qvel, gripper_qacc] + names += [f"{pf}gripper_qpos", f"{pf}gripper_qvel", f"{pf}gripper_qacc"] + + return sensors, names + + def _create_torso_sensors(self, torso_body, modality): + """ + Helper function to create sensors for the robot torso. This will be + overriden by subclasses. + """ + sensors = [] + names = [] + + for torso_name in torso_body: + pf = self.robot_model.naming_prefix + torso_index = self.sim.model.body_name2id(f"{pf}{torso_name}") + @sensor(modality=modality) + def torso_imu_quat(obs_cache): + return self.sim.data.xquat[torso_index] + + @sensor(modality=modality) + def torso_imu_vel(obs_cache): + pose = np.zeros(6) + mujoco.mj_objectVelocity( + self.sim.model._model, + self.sim.data._data, + mujoco.mjtObj.mjOBJ_BODY, + torso_index, + pose, + 1, + ) # 1 for local frame + # Reorder velocity from [ang, lin] to [lin, ang] + pose[:3], pose[3:6] = pose[3:6].copy(), pose[:3].copy() + return pose + + sensors += [torso_imu_quat, torso_imu_vel] + names += [f"{torso_name}_imu_quat", f"{torso_name}_imu_vel"] return sensors, names def _create_base_sensors(self, modality):