Skip to content

Commit 83eea58

Browse files
yhnsuyuanhaonanyuecideng
authored
update recent changes (#58)
Co-authored-by: yuanhaonan <yuanhaonan@dexforce.top> Co-authored-by: yuecideng <dengyueci@qq.com>
1 parent cfce9fe commit 83eea58

26 files changed

+455
-57
lines changed

embodichain/lab/devices/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,9 @@
1515
# ----------------------------------------------------------------------------
1616

1717
from .device import Device
18+
from .device_controller import DeviceController
19+
20+
__all__ = [
21+
"Device",
22+
"DeviceController",
23+
]
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
# ----------------------------------------------------------------------------
2+
# Copyright (c) 2021-2025 DexForce Technology Co., Ltd.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# ----------------------------------------------------------------------------
16+
17+
from __future__ import annotations
18+
19+
import torch
20+
from typing import TYPE_CHECKING, Dict, Any, Optional, List, Union
21+
22+
from embodichain.lab.devices.device import Device
23+
from embodichain.utils import logger
24+
25+
if TYPE_CHECKING:
26+
from embodichain.lab.sim.objects import Robot
27+
28+
29+
class DeviceController:
30+
"""Controller that bridges input devices (VR, keyboard, etc.) with robot control.
31+
32+
This controller is agnostic to the environment and can be used in both
33+
gym environments and pure simulation contexts. It handles:
34+
- Mapping device input to robot joint commands
35+
- Joint limit enforcement
36+
- Filtering and smoothing (if not handled by device)
37+
- Multi-device support
38+
39+
Example:
40+
# In gym environment
41+
controller = DeviceController(robot=env.robot, device=vr_device)
42+
action = controller.get_action()
43+
env.step(action)
44+
45+
# In pure simulation
46+
controller = DeviceController(robot=sim_robot, device=vr_device)
47+
qpos = controller.get_action()
48+
sim_robot.set_qpos(qpos)
49+
"""
50+
51+
def __init__(
52+
self,
53+
robot: Robot,
54+
device: Optional[Device] = None,
55+
device_name: str = "default",
56+
):
57+
"""Initialize Device Controller.
58+
59+
Args:
60+
robot: Robot instance to control.
61+
device: Input device (VR, keyboard, etc.). Can be None initially.
62+
device_name: Name identifier for this device.
63+
"""
64+
self.robot = robot
65+
self._devices: Dict[str, Device] = {}
66+
self._active_device_name: Optional[str] = None
67+
68+
if device is not None:
69+
self.add_device(device, device_name, set_active=True)
70+
71+
# Joint mapping state: maps device joint name -> robot joint index
72+
self._joint_mapping: Dict[str, int] = {}
73+
self._mapping_initialized = False
74+
75+
logger.log_info(f"Device Controller initialized for robot: {robot.uid}")
76+
logger.log_info(f" Robot has {len(robot.joint_names)} joints")
77+
78+
def add_device(
79+
self, device: Device, device_name: str, set_active: bool = False
80+
) -> None:
81+
"""Add a new input device.
82+
83+
Args:
84+
device: Device instance to add.
85+
device_name: Name identifier for the device.
86+
set_active: Whether to set this as the active device.
87+
"""
88+
self._devices[device_name] = device
89+
90+
if set_active or self._active_device_name is None:
91+
self._active_device_name = device_name
92+
93+
logger.log_info(f"Added device '{device_name}' (Active: {set_active})")
94+
95+
def remove_device(self, device_name: str) -> None:
96+
"""Remove a device.
97+
98+
Args:
99+
device_name: Name of the device to remove.
100+
"""
101+
if device_name in self._devices:
102+
del self._devices[device_name]
103+
logger.log_info(f"Removed device '{device_name}'")
104+
105+
if self._active_device_name == device_name:
106+
self._active_device_name = (
107+
list(self._devices.keys())[0] if self._devices else None
108+
)
109+
110+
def set_active_device(self, device_name: str) -> None:
111+
"""Set the active input device.
112+
113+
Args:
114+
device_name: Name of the device to activate.
115+
"""
116+
if device_name not in self._devices:
117+
logger.log_error(f"Device '{device_name}' not found")
118+
return
119+
120+
self._active_device_name = device_name
121+
logger.log_info(f"Active device set to '{device_name}'")
122+
123+
def get_action(
124+
self, device_name: Optional[str] = None, as_dict: bool = False
125+
) -> Union[torch.Tensor, Dict[str, float], None]:
126+
"""Get robot action from device input.
127+
128+
Args:
129+
device_name: Name of device to query. If None, uses active device.
130+
as_dict: Whether to return action as dict (joint_name -> value).
131+
132+
Returns:
133+
Robot action tensor (shape: [num_envs, num_joints]) or dict,
134+
or None if no valid data available.
135+
"""
136+
# Get device
137+
device_name = device_name or self._active_device_name
138+
if device_name is None or device_name not in self._devices:
139+
return None
140+
141+
device = self._devices[device_name]
142+
143+
# Get device state
144+
state = device.get_controller_state()
145+
device_data = state.get("filtered_data") or state.get("raw_data")
146+
147+
if device_data is None:
148+
return None
149+
150+
# Map device data to robot action
151+
return self._map_device_to_robot(device_data, as_dict=as_dict)
152+
153+
def _map_device_to_robot(
154+
self, device_data: Dict[str, float], as_dict: bool = False
155+
) -> Union[torch.Tensor, Dict[str, float], None]:
156+
"""Map device input to robot action.
157+
158+
Args:
159+
device_data: Device joint data (joint_name -> value).
160+
as_dict: Whether to return as dict instead of tensor.
161+
162+
Returns:
163+
Robot action or None if mapping failed.
164+
"""
165+
try:
166+
robot_joint_names = self.robot.joint_names
167+
168+
# Build joint mapping on first call
169+
if not self._mapping_initialized:
170+
self._build_joint_mapping(device_data, robot_joint_names)
171+
172+
# Extract joint values based on mapping
173+
joint_values = []
174+
joint_indices = []
175+
mapped_joints = {}
176+
177+
for device_joint, robot_idx in self._joint_mapping.items():
178+
if device_joint in device_data:
179+
value = device_data[device_joint]
180+
joint_values.append(value)
181+
joint_indices.append(robot_idx)
182+
mapped_joints[robot_joint_names[robot_idx]] = value
183+
184+
if len(joint_indices) == 0:
185+
return None
186+
187+
# Return as dict if requested
188+
if as_dict:
189+
return mapped_joints
190+
191+
# Convert to tensor and create full action
192+
device_tensor = torch.tensor(
193+
joint_values, dtype=torch.float32, device=self.robot.device
194+
)
195+
indices_tensor = torch.tensor(
196+
joint_indices, dtype=torch.long, device=self.robot.device
197+
)
198+
199+
# Get current robot qpos
200+
current_qpos = self.robot.get_qpos() # [num_envs, num_joints]
201+
202+
# Create action by updating controlled joints
203+
action = current_qpos.clone()
204+
action[:, indices_tensor] = device_tensor.unsqueeze(0)
205+
206+
# Enforce joint limits
207+
action = self._enforce_joint_limits(action)
208+
209+
return action
210+
211+
except Exception as e:
212+
logger.log_error(f"Error mapping device data to robot action: {e}")
213+
return None
214+
215+
def _build_joint_mapping(
216+
self, device_data: Dict[str, float], robot_joint_names: List[str]
217+
) -> None:
218+
"""Build mapping from device joints to robot joints.
219+
220+
Args:
221+
device_data: Device joint data to determine available joints.
222+
robot_joint_names: Robot joint names.
223+
"""
224+
self._joint_mapping = {}
225+
226+
for robot_idx, robot_joint in enumerate(robot_joint_names):
227+
# Direct name matching (can be extended with more sophisticated mapping)
228+
if robot_joint in device_data:
229+
self._joint_mapping[robot_joint] = robot_idx
230+
231+
self._mapping_initialized = True
232+
233+
logger.log_info(
234+
f"Joint mapping initialized: {len(self._joint_mapping)} joints mapped"
235+
)
236+
logger.log_info(f" Mapped joints: {list(self._joint_mapping.keys())}")
237+
238+
def _enforce_joint_limits(self, action: torch.Tensor) -> torch.Tensor:
239+
"""Enforce robot joint limits on action.
240+
241+
Args:
242+
action: Action tensor [num_envs, num_joints].
243+
244+
Returns:
245+
Clamped action tensor.
246+
"""
247+
qpos_limits = self.robot.body_data.qpos_limits[0] # [num_joints, 2]
248+
return torch.clamp(action, qpos_limits[:, 0], qpos_limits[:, 1])
249+
250+
def reset(self) -> None:
251+
"""Reset controller state."""
252+
# Reset all devices
253+
for device in self._devices.values():
254+
if hasattr(device, "reset"):
255+
device.reset()
256+
257+
# Reset mapping
258+
self._joint_mapping = {}
259+
self._mapping_initialized = False
260+
261+
logger.log_info("Device Controller reset")
262+
263+
def get_device_info(self, device_name: Optional[str] = None) -> Dict[str, Any]:
264+
"""Get information about a device.
265+
266+
Args:
267+
device_name: Name of device. If None, uses active device.
268+
269+
Returns:
270+
Device information dict.
271+
"""
272+
device_name = device_name or self._active_device_name
273+
if device_name is None or device_name not in self._devices:
274+
return {"error": "No device available"}
275+
276+
device = self._devices[device_name]
277+
state = device.get_controller_state()
278+
279+
return {
280+
"device_name": device_name,
281+
"is_active": device_name == self._active_device_name,
282+
"mapped_joints": list(self._joint_mapping.keys()),
283+
"device_state": state,
284+
}
285+
286+
def get_all_devices(self) -> List[str]:
287+
"""Get list of all registered device names."""
288+
return list(self._devices.keys())
289+
290+
@property
291+
def active_device(self) -> Optional[Device]:
292+
"""Get the currently active device."""
293+
if self._active_device_name is None:
294+
return None
295+
return self._devices.get(self._active_device_name)

embodichain/lab/gym/envs/embodied_env.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,19 @@ class EnvLightCfg:
9595
"""Data pipeline configuration. Defaults to None.
9696
"""
9797

98+
extensions: Union[Dict[str, Any], None] = None
99+
"""Extension parameters for task-specific configurations.
100+
101+
This field can be used to pass additional parameters that are specific to certain environments
102+
or tasks without modifying the base configuration class. For example:
103+
- obs_mode: Observation mode (e.g., "state", "image")
104+
- episode_length: Maximum episode length
105+
- joint_limits: Joint limit constraints
106+
- action_scale: Action scaling factor
107+
- vr_joint_mapping: VR joint mapping for teleoperation
108+
- control_frequency: Control frequency for VR teleoperation
109+
"""
110+
98111
# Some helper attributes
99112
filter_visual_rand: bool = False
100113
"""Whether to filter out visual randomization
@@ -134,17 +147,9 @@ def __init__(self, cfg: EmbodiedEnvCfg, **kwargs):
134147

135148
extensions = getattr(cfg, "extensions", {}) or {}
136149

137-
defaults = {
138-
"obs_mode": "state",
139-
"episode_length": 50,
140-
"joint_limits": 0.5,
141-
"action_scale": 0.1,
142-
}
143-
144-
for name, default in defaults.items():
145-
value = extensions.get(name, getattr(cfg, name, default))
150+
for name, value in extensions.items():
146151
setattr(cfg, name, value)
147-
setattr(self, name, getattr(cfg, name))
152+
setattr(self, name, value)
148153

149154
super().__init__(cfg, **kwargs)
150155

0 commit comments

Comments
 (0)