Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions robosuite/controllers/parts/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ def __init__(
# Torques being outputted by the controller
self.torques = None

# External torque compensation
self.external_torque_compensation = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I am missing something, but right now it looks like there isnt any way to pass in any custom value for external_torque_compensation right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

G1 will use this flag to make sure that the gravity compensation is the same between sim and real.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how will you set the value of the flag? right now it seems like there is no way to pass in any values to the class?


# Update flag to prevent redundant update calls
self.new_update = True

Expand Down
18 changes: 13 additions & 5 deletions robosuite/controllers/parts/generic/joint_pos.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ def __init__(
# initialize
self.goal_qpos = None

self.desired_torque_as_acceleration = kwargs.get("desired_torque_as_acceleration", True)
self.use_torque_compensation = kwargs.get("use_torque_compensation", True)
self.use_external_torque_compensation = kwargs.get("use_external_torque_compensation", False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be possible to add in desired_torque_as_acceleration and use_external_torque_compensation as parameters instead of using the **kwargs?


def set_goal(self, action, set_qpos=None):
"""
Expand Down Expand Up @@ -259,13 +261,19 @@ def run_controller(self):

position_error = desired_qpos - self.joint_pos
vel_pos_error = -self.joint_vel
desired_torque = np.multiply(np.array(position_error), np.array(self.kp)) + np.multiply(vel_pos_error, self.kd)
self.torques = np.multiply(np.array(position_error), np.array(self.kp)) + np.multiply(vel_pos_error, self.kd)

# Return desired torques plus gravity compensations
# The computed self.torques may represent acceleration rather than actual torques,
# depending on the meaning of PD gains.
# This usage is used to maintain compatibility with other controllers in robosuite.
if self.desired_torque_as_acceleration:
self.torques = np.dot(self.mass_matrix, self.torques)

# Return desired torques plus gravity compensation
if self.use_torque_compensation:
self.torques = np.dot(self.mass_matrix, desired_torque) + self.torque_compensation
else:
self.torques = desired_torque
self.torques += self.torque_compensation
elif self.use_external_torque_compensation:
self.torques += self.external_torque_compensation

# Always run superclass call for any cleanups at the end
super().run_controller()
Expand Down
14 changes: 10 additions & 4 deletions robosuite/environments/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,24 +375,30 @@ def unset_ep_meta(self):
"""
self._ep_meta = {}

def _update_observables(self, force=False):
def _update_observables(self, force=False, timestep=None):
"""
Updates all observables in this environment
Args:
force (bool): If True, will force all the observables to update their internal values to the newest
value. This is useful if, e.g., you want to grab observations when directly setting simulation states
without actually stepping the simulation.
timestep (float): If specified, will update the observables with the given timestep. Sometimes, we only
want to get the current time step of the observables without interrupt the observable update process.
"""
for observable in self._observables.values():
observable.update(timestep=self.model_timestep, obs_cache=self._obs_cache, force=force)
observable.update(
timestep=self.model_timestep if timestep is None else timestep, obs_cache=self._obs_cache, force=force
)

def _get_observations(self, force_update=False):
def _get_observations(self, force_update=False, timestep=None):
"""
Grabs observations from the environment.
Args:
force_update (bool): If True, will force all the observables to update their internal values to the newest
value. This is useful if, e.g., you want to grab observations when directly setting simulation states
without actually stepping the simulation.
timestep (float): If specified, will update the observables with the given timestep. Sometimes, we only
want to get the current time step of the observables without interrupt the observable update process.
Returns:
OrderedDict: OrderedDict containing observations [(name_string, np.array), ...]
"""
Expand All @@ -401,7 +407,7 @@ def _get_observations(self, force_update=False):

# Force an update if requested
if force_update:
self._update_observables(force=True)
self._update_observables(force=True, timestep=timestep)

# Loop through all observables and grab their current observation
for obs_name, observable in self._observables.items():
Expand Down
53 changes: 50 additions & 3 deletions robosuite/robots/robot.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import copy
import json
import os
from collections import OrderedDict
from typing import Optional

import mujoco
import numpy as np

import robosuite.utils.transform_utils as T
Expand Down Expand Up @@ -373,6 +373,14 @@ def joint_acc(obs_cache):
names += arm_sensor_names
actives += [True] * len(arm_sensors)

if hasattr(self.robot_model, "torso_body"):
torso_sensors, torso_sensor_names = self._create_torso_sensors(
self.robot_model.torso_body, modality=modality
)
sensors += torso_sensors
names += torso_sensor_names
actives += [True] * len(torso_sensors)

base_sensors, base_sensor_names = self._create_base_sensors(modality=modality)
sensors += base_sensors
names += base_sensor_names
Expand Down Expand Up @@ -476,9 +484,48 @@ def gripper_qpos(obs_cache):
def gripper_qvel(obs_cache):
return np.array([self.sim.data.qvel[x] for x in self._ref_gripper_joint_vel_indexes[arm]])

sensors += [gripper_qpos, gripper_qvel]
names += [f"{pf}gripper_qpos", f"{pf}gripper_qvel"]
@sensor(modality=modality)
def gripper_qacc(obs_cache):
return np.array([self.sim.data.qacc[x] for x in self._ref_gripper_joint_vel_indexes[arm]])

sensors += [gripper_qpos, gripper_qvel, gripper_qacc]
names += [f"{pf}gripper_qpos", f"{pf}gripper_qvel", f"{pf}gripper_qacc"]

return sensors, names

def _create_torso_sensors(self, torso_body, modality):
"""
Helper function to create sensors for the robot torso. This will be
overriden by subclasses.
"""
sensors = []
names = []

for torso_name in torso_body:
pf = self.robot_model.naming_prefix
torso_index = self.sim.model.body_name2id(f"{pf}{torso_name}")

@sensor(modality=modality)
def torso_imu_quat(obs_cache):
return self.sim.data.xquat[torso_index]

@sensor(modality=modality)
def torso_imu_vel(obs_cache):
pose = np.zeros(6)
mujoco.mj_objectVelocity(
self.sim.model._model,
self.sim.data._data,
mujoco.mjtObj.mjOBJ_BODY,
torso_index,
pose,
1,
) # 1 for local frame
# Reorder velocity from [ang, lin] to [lin, ang]
pose[:3], pose[3:6] = pose[3:6].copy(), pose[:3].copy()
return pose

sensors += [torso_imu_quat, torso_imu_vel]
names += [f"{torso_name}_imu_quat", f"{torso_name}_imu_vel"]
return sensors, names

def _create_base_sensors(self, modality):
Expand Down