diff --git a/examples/run_hang_lifebuoy_teleop.py b/examples/run_hang_lifebuoy_teleop.py new file mode 100644 index 0000000..f5e6afb --- /dev/null +++ b/examples/run_hang_lifebuoy_teleop.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +""" +Teleop script for Hang Lifebuoy environment. +""" + +import genesis as gs +import torch +from gs_agent.wrappers.teleop_wrapper import KeyboardWrapper +from gs_env.sim.envs.config.registry import EnvArgsRegistry +from gs_env.sim.envs.manipulation.hang_lifebuoy_env import HangLifebuoyEnv + + +def main() -> None: + """Run teleop for hang lifebuoy task.""" + print("Initializing Hang Lifebuoy Teleop System...") + + # Initialize Genesis + gs.init( + seed=0, + precision="32", + logging_level="info", + backend=gs.cpu, + ) + + # Create teleop wrapper first (without environment) + print("Creating teleop wrapper...") + teleop_wrapper = KeyboardWrapper( + env=None, + device=torch.device("cpu"), + movement_speed=0.01, # Position movement speed + rotation_speed=0.05, # Rotation speed + trajectory_filename_prefix="hang_lifebuoy_", + ) + + # Start teleop wrapper (keyboard listener) FIRST, before creating Genesis scene + teleop_wrapper.start() + + # Create task environment AFTER teleop wrapper is running + env = HangLifebuoyEnv( + args=EnvArgsRegistry["hang_lifebuoy_default"], + device=torch.device("cpu"), + ) + teleop_wrapper.set_environment(env) + + print("\n" + "=" * 50) + print("Hang Lifebuoy TELEOP SYSTEM READY") + print("=" * 50) + print("šŸ“ TRAJECTORY RECORDING INSTRUCTIONS:") + print(" 1. Press 'r' to start recording (anytime)") + print(" 2. Move robot with arrow keys, n/m, space") + print(" 3. Press 'r' again to stop recording and save") + print(" šŸ’” Recording works from any state now!") + print("=" * 50) + + # Run the main control loop in the main thread (Genesis viewer requires this) + try: + step_count = 0 + while teleop_wrapper.running: + # Step the teleop wrapper (this processes input and steps environment) + teleop_wrapper.step(torch.tensor([])) + step_count += 1 + + # Check for quit command + if ( + hasattr(teleop_wrapper, "last_command") + and teleop_wrapper.last_command + and hasattr(teleop_wrapper.last_command, "quit_teleop") + and teleop_wrapper.last_command.quit_teleop + ): + print("Quit command received, exiting...") + break + + # Safety check - exit after 1 hour of running + if step_count > 180000: # 1 hour at 50Hz + print("Maximum runtime reached, exiting...") + break + + except KeyboardInterrupt: + print("\nšŸ‘‹ Teleop interrupted by user") + + finally: + # Cleanup + teleop_wrapper.stop() + print("āœ… Teleop session ended") + + +if __name__ == "__main__": + main() diff --git a/examples/run_pick_cube_teleop.py b/examples/run_pick_cube_teleop.py index 9869251..58c88e7 100644 --- a/examples/run_pick_cube_teleop.py +++ b/examples/run_pick_cube_teleop.py @@ -35,7 +35,7 @@ def main() -> None: seed=0, precision="32", logging_level="info", - backend=gs.cpu, # type: ignore + backend=gs.cpu, ) try: @@ -49,7 +49,7 @@ def main() -> None: ) # Start teleop wrapper (keyboard listener) FIRST, before creating Genesis scene - teleop_wrapper.start() # type: ignore + teleop_wrapper.start() # Create task environment AFTER teleop wrapper is running env = create_gs_env() diff --git a/examples/run_put_bowl_teleop.py b/examples/run_put_bowl_teleop.py new file mode 100644 index 0000000..779714b --- /dev/null +++ b/examples/run_put_bowl_teleop.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +""" +Teleop script for Put Bowl Inside Microwave environment. +""" + +import genesis as gs +import torch +from gs_agent.wrappers.teleop_wrapper import KeyboardWrapper +from gs_env.sim.envs.config.registry import EnvArgsRegistry +from gs_env.sim.envs.manipulation.put_bowl_inside_microwave_env import PutBowlInsideMicrowaveEnv + + +def main() -> None: + """Run teleop for put bowl inside microwave task.""" + print("Initializing Put Bowl Inside Microwave Teleop System...") + + # Initialize Genesis + gs.init( + seed=0, + precision="32", + logging_level="info", + backend=gs.cpu, # type: ignore + ) + + # Create teleop wrapper first (without environment) + print("Creating teleop wrapper...") + teleop_wrapper = KeyboardWrapper( + env=None, + device=torch.device("cpu"), + movement_speed=0.01, # Position movement speed + rotation_speed=0.05, # Rotation speed + trajectory_filename_prefix="put_bowl_microwave_", + ) + + # Start teleop wrapper (keyboard listener) FIRST, before creating Genesis scene + teleop_wrapper.start() # type: ignore + + # Create task environment AFTER teleop wrapper is running + env = PutBowlInsideMicrowaveEnv( + args=EnvArgsRegistry["put_bowl_inside_microwave_default"], + device=torch.device("cpu"), + ) + teleop_wrapper.set_environment(env) + + print("\n" + "=" * 50) + print("Put Bowl Inside Microwave TELEOP SYSTEM READY") + print("=" * 50) + print("šŸ“ TRAJECTORY RECORDING INSTRUCTIONS:") + print(" 1. Press 'r' to start recording (anytime)") + print(" 2. Move robot with arrow keys, n/m, space") + print(" 3. Press 'r' again to stop recording and save") + print(" šŸ’” Recording works from any state now!") + print("=" * 50) + + # Run the main control loop in the main thread (Genesis viewer requires this) + try: + step_count = 0 + while teleop_wrapper.running: + # Step the teleop wrapper (this processes input and steps environment) + teleop_wrapper.step(torch.tensor([])) + step_count += 1 + + # Check for quit command + if ( + hasattr(teleop_wrapper, "last_command") + and teleop_wrapper.last_command + and hasattr(teleop_wrapper.last_command, "quit_teleop") + and teleop_wrapper.last_command.quit_teleop + ): + print("Quit command received, exiting...") + break + + # Safety check - exit after 1 hour of running + if step_count > 180000: # 1 hour at 50Hz + print("Maximum runtime reached, exiting...") + break + + except KeyboardInterrupt: + print("\nšŸ‘‹ Teleop interrupted by user") + + finally: + # Cleanup + teleop_wrapper.stop() + print("āœ… Teleop session ended") + + +if __name__ == "__main__": + main() diff --git a/examples/run_sweep_table_teleop.py b/examples/run_sweep_table_teleop.py new file mode 100644 index 0000000..c28a111 --- /dev/null +++ b/examples/run_sweep_table_teleop.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +""" +Teleop script for Sweep Table environment. +""" + +import genesis as gs +import torch +from gs_agent.wrappers.teleop_wrapper import KeyboardWrapper +from gs_env.sim.envs.config.registry import EnvArgsRegistry +from gs_env.sim.envs.manipulation.sweep_table_env import SweepTableEnv + + +def main() -> None: + """Run teleop for sweep table task.""" + print("Initializing Sweep Table Teleop System...") + + # Initialize Genesis + gs.init( + seed=0, + precision="32", + logging_level="info", + backend=gs.cpu, + ) + + # Create teleop wrapper first (without environment) + print("Creating teleop wrapper...") + teleop_wrapper = KeyboardWrapper( + env=None, + device=torch.device("cpu"), + movement_speed=0.01, # Position movement speed + rotation_speed=0.05, # Rotation speed + trajectory_filename_prefix="sweep_table_", + ) + + # Start teleop wrapper (keyboard listener) FIRST, before creating Genesis scene + teleop_wrapper.start() + + # Create task environment AFTER teleop wrapper is running + env = SweepTableEnv( + args=EnvArgsRegistry["sweep_table_default"], + device=torch.device("cpu"), + ) + teleop_wrapper.set_environment(env) + + print("\n" + "=" * 50) + print("Sweep Table TELEOP SYSTEM READY") + print("=" * 50) + print("šŸ“ TRAJECTORY RECORDING INSTRUCTIONS:") + print(" 1. Press 'r' to start recording (anytime)") + print(" 2. Move robot with arrow keys, n/m, space") + print(" 3. Press 'r' again to stop recording and save") + print(" šŸ’” Recording works from any state now!") + print("=" * 50) + + # Run the main control loop in the main thread (Genesis viewer requires this) + try: + step_count = 0 + while teleop_wrapper.running: + # Step the teleop wrapper (this processes input and steps environment) + teleop_wrapper.step(torch.tensor([])) + step_count += 1 + + # Check for quit command + if ( + hasattr(teleop_wrapper, "last_command") + and teleop_wrapper.last_command + and hasattr(teleop_wrapper.last_command, "quit_teleop") + and teleop_wrapper.last_command.quit_teleop + ): + print("Quit command received, exiting...") + break + + # Safety check - exit after 1 hour of running + if step_count > 180000: # 1 hour at 50Hz + print("Maximum runtime reached, exiting...") + break + + except KeyboardInterrupt: + print("\nšŸ‘‹ Teleop interrupted by user") + + finally: + # Cleanup + teleop_wrapper.stop() + print("āœ… Teleop session ended") + + +if __name__ == "__main__": + main() diff --git a/src/agent/gs_agent/bases/env_wrapper.py b/src/agent/gs_agent/bases/env_wrapper.py index 5daec0a..6f09835 100644 --- a/src/agent/gs_agent/bases/env_wrapper.py +++ b/src/agent/gs_agent/bases/env_wrapper.py @@ -14,15 +14,15 @@ def __init__(self, env: Any, device: torch.device) -> None: self.device: Final[torch.device] = device @abstractmethod - def reset(self) -> tuple[torch.Tensor, dict[str, Any]]: ... + def reset(self) -> tuple[dict[str, Any], dict[str, Any]]: ... @abstractmethod def step( self, action: torch.Tensor - ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, dict[str, Any]]: ... + ) -> tuple[dict[str, Any], torch.Tensor, torch.Tensor, torch.Tensor, dict[str, Any]]: ... @abstractmethod - def get_observations(self) -> torch.Tensor: ... + def get_observations(self) -> dict[str, Any]: ... @property @abstractmethod diff --git a/src/agent/gs_agent/wrappers/gs_env_wrapper.py b/src/agent/gs_agent/wrappers/gs_env_wrapper.py index ed3b4f0..329699d 100644 --- a/src/agent/gs_agent/wrappers/gs_env_wrapper.py +++ b/src/agent/gs_agent/wrappers/gs_env_wrapper.py @@ -22,14 +22,14 @@ def __init__( # --------------------------- # BatchEnvWrapper API (batch) # --------------------------- - def reset(self) -> tuple[torch.Tensor, dict[str, Any]]: + def reset(self) -> tuple[dict[str, Any], dict[str, Any]]: self.env.reset() self._curr_obs = self.env.get_observations() return self._curr_obs, self.env.get_extra_infos() def step( self, action: torch.Tensor - ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, dict[str, Any]]: + ) -> tuple[dict[str, Any], torch.Tensor, torch.Tensor, torch.Tensor, dict[str, Any]]: # apply action self.env.apply_action(action) # get observations @@ -55,7 +55,7 @@ def step( extra_infos["reward_terms"] = reward_terms return next_obs, reward, terminated, truncated, extra_infos - def get_observations(self) -> torch.Tensor: + def get_observations(self) -> dict[str, Any]: return self.env.get_observations() @property diff --git a/src/agent/gs_agent/wrappers/gym_env_wrapper.py b/src/agent/gs_agent/wrappers/gym_env_wrapper.py index 0ab2019..112355d 100644 --- a/src/agent/gs_agent/wrappers/gym_env_wrapper.py +++ b/src/agent/gs_agent/wrappers/gym_env_wrapper.py @@ -20,14 +20,14 @@ def __init__(self, env: gym.Env[Any, Any], device: torch.device = _DEFAULT_DEVIC # --------------------------- # BatchEnvWrapper API (batch) # --------------------------- - def reset(self) -> tuple[torch.Tensor, dict[str, Any]]: + def reset(self) -> tuple[dict[str, Any], dict[str, Any]]: obs, info = self.env.reset() self._curr_obs = torch.tensor(obs, device=self.device).unsqueeze(0) - return self._curr_obs, info + return {"obs": self._curr_obs}, info def step( self, action: torch.Tensor - ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, dict[str, Any]]: + ) -> tuple[dict[str, Any], torch.Tensor, torch.Tensor, torch.Tensor, dict[str, Any]]: gym_actions = action.clone().cpu().numpy().squeeze(0) # shape: [action_dim] obs, reward, terminated, truncated, info = self.env.step(gym_actions) @@ -42,10 +42,10 @@ def step( reward_batch = torch.as_tensor([[float(reward)]], device=self.device) terminated_batch = torch.as_tensor([[terminated]], device=self.device) truncated_batch = torch.as_tensor([[truncated]], device=self.device) - return self._curr_obs, reward_batch, terminated_batch, truncated_batch, info + return {"obs": self._curr_obs}, reward_batch, terminated_batch, truncated_batch, info - def get_observations(self) -> torch.Tensor: - return self._curr_obs + def get_observations(self) -> dict[str, Any]: + return {"obs": self._curr_obs} @property def action_dim(self) -> int: diff --git a/src/agent/gs_agent/wrappers/teleop_wrapper.py b/src/agent/gs_agent/wrappers/teleop_wrapper.py index f3f1826..7a7894d 100644 --- a/src/agent/gs_agent/wrappers/teleop_wrapper.py +++ b/src/agent/gs_agent/wrappers/teleop_wrapper.py @@ -4,7 +4,6 @@ import time from typing import Any -import numpy as np import torch from pynput import keyboard @@ -169,15 +168,15 @@ def stop(self) -> None: if self.listener: self.listener.stop() - def reset(self) -> tuple[torch.Tensor, dict[str, Any]]: + def reset(self) -> tuple[dict[str, Any], dict[str, Any]]: """Reset the environment.""" self._env.reset_idx(torch.IntTensor([0])) obs = self._convert_observation_to_dict() - return torch.tensor([]), obs + return obs, {} def step( self, action: torch.Tensor - ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, dict[str, Any]]: + ) -> tuple[dict[str, Any], torch.Tensor, torch.Tensor, torch.Tensor, dict[str, Any]]: """Step the environment with teleop input.""" # Process keyboard input and create command command = self._process_input() @@ -194,34 +193,28 @@ def step( # Return teleop-specific format (rewards/termination not applicable) return ( - torch.tensor([]), # next_obs + obs, # next_obs torch.tensor([0.0]), # reward torch.tensor([False]), # terminated torch.tensor([False]), # truncated - obs, # extra_infos + {}, # extra_infos ) - def get_observations(self) -> torch.Tensor: + def get_observations(self) -> dict[str, Any]: """Get current observations.""" return self._env.get_observations() def _convert_observation_to_dict(self) -> dict[str, Any]: - """Convert tensor observation to dictionary format for teleop compatibility.""" - - # Get cube position - cube_pos = np.array(self._env.entities["cube"].get_pos()) - cube_quat = np.array(self._env.entities["cube"].get_quat()) - - # Create observation dictionary (for teleop compatibility) - observation = { - "ee_pose": self._env.entities["robot"].ee_pose, - # "end_effector_pos": robot_obs["end_effector_pos"], - # "end_effector_quat": robot_obs["end_effector_quat"], - "cube_pos": cube_pos, - "cube_quat": cube_quat, - "rgb_images": {}, # No cameras in this simple setup - "depth_images": {}, # No depth sensors in this simple setup - } + """Convert observation to dictionary format for teleop compatibility.""" + if not hasattr(self, "_env") or self._env is None: + return {} + + # Get the observation dictionary from the environment + observation = self._env.get_observations() + + # Add teleop-specific fields + observation["rgb_images"] = {} # No cameras in this simple setup + observation["depth_images"] = {} # No depth sensors in this simple setup return observation diff --git a/src/env/gs_env/common/bases/base_env.py b/src/env/gs_env/common/bases/base_env.py index 1d367fa..bbc0fcb 100644 --- a/src/env/gs_env/common/bases/base_env.py +++ b/src/env/gs_env/common/bases/base_env.py @@ -2,6 +2,7 @@ from collections.abc import Mapping from typing import Any, Final +import numpy as np import torch @@ -27,7 +28,7 @@ def reset_idx(self, envs_idx: torch.IntTensor) -> None: ... def apply_action(self, action: torch.Tensor) -> None: ... @abstractmethod - def get_observations(self) -> torch.Tensor: ... + def get_observations(self) -> dict[str, Any]: ... @abstractmethod def get_extra_infos(self) -> dict[str, Any]: ... @@ -48,6 +49,90 @@ def num_envs(self) -> int: ... def observation_spec(self) -> Mapping[str, Any]: return {} + def _setup_camera( + self, + pos: tuple[float, float, float] = (1.5, 0.0, 0.7), + lookat: tuple[float, float, float] = (0.2, 0.0, 0.1), + fov: int = 50, + resolution: tuple[int, int] = (640, 480), + ) -> None: + """Setup camera for image capture. Override in subclasses that support cameras.""" + # Default implementation does nothing - environments without cameras can ignore this + # This is intentionally empty to allow environments without cameras to skip camera setup + return + + def _process_rgb_tensor( + self, rgb: torch.Tensor | np.ndarray[Any, Any] | None, normalize: bool = True + ) -> torch.Tensor | None: + """Process raw RGB tensor from camera render. Common processing logic.""" + if rgb is None: + return None + + try: + # Convert numpy array to torch tensor if needed + if not isinstance(rgb, torch.Tensor): + # Make a copy to avoid negative stride issues + rgb = torch.from_numpy(rgb.copy()) + + # Convert to proper format: (H, W, C) or (B, H, W, C) -> (B, C, H, W) + if rgb.dim() == 3: # Single image (H, W, C) + rgb = rgb.permute(2, 0, 1).unsqueeze(0)[:, :3] # -> (1, 3, H, W) + else: # Batch (B, H, W, C) + rgb = rgb.permute(0, 3, 1, 2)[:, :3] # -> (B, 3, H, W) + + # Normalize if requested + if normalize: + rgb = torch.clamp(rgb, min=0.0, max=255.0) / 255.0 + + return rgb + except Exception as e: + print(f"Warning: Could not process RGB tensor: {e}") + return None + + def get_rgb_image(self, normalize: bool = True) -> torch.Tensor | None: + """Capture RGB image from camera (if available). Override in subclasses.""" + return None + + def save_camera_image(self, filename: str = "camera_capture.png") -> None: + """Save the current camera image to a file for inspection.""" + try: + import matplotlib.pyplot as plt + + # Get the raw image (not normalized) + rgb_image = self.get_rgb_image(normalize=False) + + if rgb_image is not None: + # Convert from (B, C, H, W) to (H, W, C) for display + if rgb_image.dim() == 4: # Batch + image = rgb_image[0].permute(1, 2, 0).cpu().numpy() + else: # Single image + image = rgb_image.permute(1, 2, 0).cpu().numpy() + + # Ensure values are in 0-255 range + image = np.clip(image, 0, 255).astype(np.uint8) + + # Save the image + plt.imsave(filename, image) + print(f"Camera image saved to: {filename}") + else: + print("No camera image available to save") + except Exception as e: + print(f"Error saving camera image: {e}") + + def _add_rgb_to_observations(self, observations: dict[str, Any]) -> dict[str, Any]: + """Add RGB images to observations dictionary. Call this from get_observations().""" + # Capture RGB image + rgb_images = {} + rgb_image = self.get_rgb_image() + if rgb_image is not None: + rgb_images["camera"] = rgb_image + + # Add RGB images to observations + observations["rgb_images"] = rgb_images + observations["depth_images"] = {} # Placeholder for future depth camera support + + return observations + def action_spec(self) -> Mapping[str, Any]: return {} diff --git a/src/env/gs_env/sim/envs/__init__.py b/src/env/gs_env/sim/envs/__init__.py index e69de29..7a64bab 100644 --- a/src/env/gs_env/sim/envs/__init__.py +++ b/src/env/gs_env/sim/envs/__init__.py @@ -0,0 +1,15 @@ +from .manipulation import ( + GoalReachingEnv, + HangLifebuoyEnv, + PickCubeEnv, + PutBowlInsideMicrowaveEnv, + SweepTableEnv, +) + +__all__ = [ + "GoalReachingEnv", + "HangLifebuoyEnv", + "PickCubeEnv", + "PutBowlInsideMicrowaveEnv", + "SweepTableEnv", +] diff --git a/src/env/gs_env/sim/envs/config/registry.py b/src/env/gs_env/sim/envs/config/registry.py index 2fc817e..44ab1a1 100644 --- a/src/env/gs_env/sim/envs/config/registry.py +++ b/src/env/gs_env/sim/envs/config/registry.py @@ -56,3 +56,85 @@ reward_args={}, img_resolution=(480, 270), ) + + +EnvArgsRegistry["put_bowl_inside_microwave_default"] = EnvArgs( + gs_init_args=GenesisInitArgsRegistry["default"], + scene_args=SceneArgsRegistry["flat_scene_default"], + robot_args=RobotArgsRegistry["franka_teleop"], + objects_args=[], # Objects are created directly in the environment + sensors_args=[], + reward_args={}, + img_resolution=(480, 270), + env_config={ + "show_viewer": True, + "show_FPS": False, + "table_pos": (0.0, 0.0, 0.05), + "table_size": (0.6, 0.6, 0.1), + "bowl_scale": 1 / 5000, + "microwave_scale": 0.3, + "microwave_pos": (0.2, 0.2, 0.18), + "microwave_euler": (0, 0, 30), + }, +) + + +EnvArgsRegistry["hang_lifebuoy_default"] = EnvArgs( + gs_init_args=GenesisInitArgsRegistry["default"], + scene_args=SceneArgsRegistry["flat_scene_default"], + robot_args=RobotArgsRegistry["franka_teleop"], + objects_args=[], # Objects are created directly in the environment + sensors_args=[], + reward_args={}, + img_resolution=(480, 270), + env_config={ + "show_viewer": True, + "show_FPS": False, + "table_pos": (0.0, 0.0, 0.05), + "table_size": (0.6, 0.6, 0.1), + "lifebuoy_scale": 0.03, + "hanger_scale": (10, 5, 10), + "hanger_pos": (0.05, -0.2, 0.15), + "hanger_euler": (90, 0, 90), + }, +) + + +EnvArgsRegistry["sweep_table_default"] = EnvArgs( + gs_init_args=GenesisInitArgsRegistry["default"], + scene_args=SceneArgsRegistry["flat_scene_default"], + robot_args=RobotArgsRegistry["franka_teleop"], + objects_args=[], # Objects are created directly in the environment + sensors_args=[], + reward_args={}, + img_resolution=(480, 270), + env_config={ + "show_viewer": True, + "show_FPS": False, + "table_pos": (0.0, 0.0, 0.05), + "table_size": (0.6, 0.6, 0.1), + "broom_scale": (1 / 400, 1 / 800, 1 / 400), + "broom_pos": (0.05, -0.2, 0.15), + "broom_euler": (90, 0, 90), + "trashbox_size": (0.03, 0.03, 0.03), + "target_zone_pos": (0.1, 0.3, 0.045), + "target_zone_size": (0.3, 0.3, 0.003), + }, +) + + +EnvArgsRegistry["pick_cube_default"] = EnvArgs( + gs_init_args=GenesisInitArgsRegistry["default"], + scene_args=SceneArgsRegistry["flat_scene_default"], + robot_args=RobotArgsRegistry["franka_teleop"], + objects_args=[], # Objects are created directly in the environment + sensors_args=[], + reward_args={}, + img_resolution=(480, 270), + env_config={ + "show_viewer": True, + "show_FPS": False, + "cube_pos": (0.5, 0.0, 0.07), + "cube_size": (0.04, 0.04, 0.04), + }, +) diff --git a/src/env/gs_env/sim/envs/config/schema.py b/src/env/gs_env/sim/envs/config/schema.py index 86d1cd4..6aaf8dc 100644 --- a/src/env/gs_env/sim/envs/config/schema.py +++ b/src/env/gs_env/sim/envs/config/schema.py @@ -28,3 +28,5 @@ class EnvArgs(BaseModel): sensors_args: list[SensorArgs] reward_args: dict[str, float] img_resolution: tuple[int, int] + # Environment-specific configuration + env_config: dict[str, float | bool | tuple[float, ...]] = {} diff --git a/src/env/gs_env/sim/envs/manipulation/__init__.py b/src/env/gs_env/sim/envs/manipulation/__init__.py index b43820f..b3231af 100644 --- a/src/env/gs_env/sim/envs/manipulation/__init__.py +++ b/src/env/gs_env/sim/envs/manipulation/__init__.py @@ -1,5 +1,13 @@ from .goal_reaching_env import GoalReachingEnv +from .hang_lifebuoy_env import HangLifebuoyEnv +from .pick_cube_env import PickCubeEnv +from .put_bowl_inside_microwave_env import PutBowlInsideMicrowaveEnv +from .sweep_table_env import SweepTableEnv __all__ = [ "GoalReachingEnv", + "HangLifebuoyEnv", + "PickCubeEnv", + "PutBowlInsideMicrowaveEnv", + "SweepTableEnv", ] diff --git a/src/env/gs_env/sim/envs/manipulation/goal_reaching_env.py b/src/env/gs_env/sim/envs/manipulation/goal_reaching_env.py index fa2c5cb..ba373ba 100644 --- a/src/env/gs_env/sim/envs/manipulation/goal_reaching_env.py +++ b/src/env/gs_env/sim/envs/manipulation/goal_reaching_env.py @@ -30,7 +30,6 @@ def __init__( ) -> None: super().__init__(device=device) self._num_envs = num_envs - self._device = device self._show_viewer = show_viewer self._args = args @@ -53,6 +52,9 @@ def __init__( device=self.device, ) + # Add camera for image capture + self._setup_camera() + # == setup target entity self._target = self._scene.add_entity( gs.morphs.Box(size=(0.05, 0.05, 0.05), collision=False), @@ -72,6 +74,37 @@ def __init__( self._init() self.reset() + def _setup_camera( + self, + pos: tuple[float, float, float] = (1.5, 0.0, 0.7), + lookat: tuple[float, float, float] = (0.2, 0.0, 0.1), + fov: int = 50, + resolution: tuple[int, int] = (640, 480), + ) -> None: + """Setup camera for image capture using Genesis camera renderer.""" + # Add camera to scene (Genesis-specific) + self.camera = self._scene.scene.add_camera( + res=resolution, + pos=pos, # Camera position + lookat=lookat, # Camera lookat point + fov=fov, # Field of view + GUI=False, # Don't show in GUI + ) + + def get_rgb_image(self, normalize: bool = True) -> torch.Tensor | None: + """Capture RGB image from camera.""" + try: + # Render camera image (Genesis-specific) + rgb, _, _, _ = self.camera.render( + rgb=True, depth=False, segmentation=False, normal=False + ) + + # Use base class processing logic + return self._process_rgb_tensor(rgb, normalize) + except Exception as e: + print(f"Warning: Could not capture camera image: {e}") + return None + def _init(self) -> None: # specify the space attributes self._action_space = self._robot.action_space @@ -90,12 +123,12 @@ def _init(self) -> None: self._info_space = gym.spaces.Dict({}) # - self.goal_pose = torch.zeros(self.num_envs, 7, dtype=torch.float32, device=self._device) - self.time_since_reset = torch.zeros(self.num_envs, device=self._device) + self.goal_pose = torch.zeros(self.num_envs, 7, dtype=torch.float32, device=self.device) + self.time_since_reset = torch.zeros(self.num_envs, device=self.device) self.keypoints_offset = self.get_keypoint_offsets( - batch_size=self.num_envs, device=self._device, unit_length=0.5 + batch_size=self.num_envs, device=self.device, unit_length=0.5 ) - self.action_buf = torch.zeros((self.num_envs, self.action_dim), device=self._device) + self.action_buf = torch.zeros((self.num_envs, self.action_dim), device=self.device) def reset_idx(self, envs_idx: torch.IntTensor) -> None: if len(envs_idx) == 0: @@ -104,28 +137,28 @@ def reset_idx(self, envs_idx: torch.IntTensor) -> None: # Generate random goal positions num_reset = len(envs_idx) # Random position within reachable workspace - random_x = torch.rand(num_reset, device=self._device) * 0.3 + 0.15 # 0.15 to 0.45 - random_y = (torch.rand(num_reset, device=self._device) - 0.5) * 0.4 # -0.2 to 0.2 - random_z = torch.rand(num_reset, device=self._device) * 0.2 + 0.1 # 0.1 to 0.3 + random_x = torch.rand(num_reset, device=self.device) * 0.3 + 0.15 # 0.15 to 0.45 + random_y = (torch.rand(num_reset, device=self.device) - 0.5) * 0.4 # -0.2 to 0.2 + random_z = torch.rand(num_reset, device=self.device) * 0.2 + 0.1 # 0.1 to 0.3 self.goal_pose[envs_idx, 0] = random_x self.goal_pose[envs_idx, 1] = random_y self.goal_pose[envs_idx, 2] = random_z self.goal_pose[envs_idx] = torch.tensor( - [0.2, 0.0, 0.2, 1.0, 0.0, 0.0, 0.0], dtype=torch.float32, device=self._device + [0.2, 0.0, 0.2, 1.0, 0.0, 0.0, 0.0], dtype=torch.float32, device=self.device + ) + q_down = torch.tensor([0.0, 0.0, 1.0, 0.0], dtype=torch.float32, device=self.device).repeat( + num_reset, 1 ) - q_down = torch.tensor( - [0.0, 0.0, 1.0, 0.0], dtype=torch.float32, device=self._device - ).repeat(num_reset, 1) - random_yaw = torch.rand(num_reset, device=self._device) * 2 * np.pi - np.pi # -pi to pi + random_yaw = torch.rand(num_reset, device=self.device) * 2 * np.pi - np.pi # -pi to pi random_yaw *= 0.25 # reduce the range to [-pi/4, pi/4] # random_yaw *= 0.0 # reduce the range to [-pi/4, pi/4] q_yaw = torch.stack( [ torch.cos(random_yaw / 2), - torch.zeros(num_reset, device=self._device), - torch.zeros(num_reset, device=self._device), + torch.zeros(num_reset, device=self.device), + torch.zeros(num_reset, device=self.device), torch.sin(random_yaw / 2), ], dim=-1, @@ -144,21 +177,25 @@ def get_truncated(self) -> torch.Tensor: time_out_buf = self.time_since_reset > self._max_sim_time return time_out_buf - def get_observations(self) -> torch.Tensor: + def get_observations(self) -> dict[str, Any]: # Current end-effector pose ee_pos, ee_quat = self._robot.ee_pose[:, :3], self._robot.ee_pose[:, 3:7] # pos_diff = ee_pos - self.goal_pose[:, :3] - obs_components = [ - pos_diff, # 3D position difference - ee_quat, # current orientation (4D quaternion) - self.goal_pose, # goal pose (7D: pos + quat) - ] - return torch.cat(obs_components, dim=-1) + + observations = { + "pose_vec": pos_diff, # 3D position difference + "ee_quat": ee_quat, # current orientation (4D quaternion) + "ref_position": self.goal_pose[:, :3], # goal position + "ref_quat": self.goal_pose[:, 3:7], # goal quaternion + } + + # Add RGB images using base class helper + return self._add_rgb_to_observations(observations) def apply_action(self, action: torch.Tensor) -> None: action = self.rescale_action(action) - self.action_buf[:] = action.clone().to(self._device) + self.action_buf[:] = action.clone().to(self.device) self.time_since_reset += self._scene.scene.dt self._robot.apply_action(action=action) self._scene.scene.step() @@ -168,7 +205,7 @@ def get_extra_infos(self) -> dict[str, Any]: def rescale_action(self, action: torch.Tensor) -> torch.Tensor: action_scale: torch.Tensor = torch.tensor( - [0.1, 0.1, 0.1, 0.1, 0.1, 0.1], dtype=torch.float32, device=self._device + [0.1, 0.1, 0.1, 0.1, 0.1, 0.1], dtype=torch.float32, device=self.device ) return action * action_scale diff --git a/src/env/gs_env/sim/envs/manipulation/hang_lifebuoy_env.py b/src/env/gs_env/sim/envs/manipulation/hang_lifebuoy_env.py new file mode 100644 index 0000000..0414df3 --- /dev/null +++ b/src/env/gs_env/sim/envs/manipulation/hang_lifebuoy_env.py @@ -0,0 +1,292 @@ +import random +from typing import Any + +import genesis as gs +import torch +from gs_agent.wrappers.teleop_wrapper import KeyboardCommand + +from gs_env.common.bases.base_env import BaseEnv +from gs_env.sim.envs.config.schema import EnvArgs +from gs_env.sim.robots.config.schema import EEPoseAbsAction +from gs_env.sim.robots.manipulators import FrankaRobot + +_DEFAULT_DEVICE = torch.device("cpu") + + +class HangLifebuoyEnv(BaseEnv): + """Hang lifebuoy on hanger environment.""" + + def __init__( + self, + args: EnvArgs, + num_envs: int = 1, + device: torch.device = _DEFAULT_DEVICE, + ) -> None: + super().__init__(device=device) + self._num_envs = num_envs + FPS = 60 + # Create Genesis scene + self.scene = gs.Scene( + sim_options=gs.options.SimOptions( + substeps=4, + dt=1 / FPS, + ), + rigid_options=gs.options.RigidOptions( + enable_joint_limit=True, + enable_collision=True, + gravity=(0, 0, -9.8), + box_box_detection=True, + constraint_timeconst=0.02, + ), + viewer_options=gs.options.ViewerOptions( + camera_pos=(1.5, 0.0, 0.7), + camera_lookat=(0.2, 0.0, 0.1), + camera_fov=50, + max_FPS=200, + ), + show_viewer=bool(args.env_config.get("show_viewer", True)), + show_FPS=bool(args.env_config.get("show_FPS", False)), + ) + + # Add entities + self.entities = {} + + # Ground plane + self.entities["plane"] = self.scene.add_entity(gs.morphs.Plane()) + + # SO101 robot + self.entities["robot"] = FrankaRobot( + num_envs=self._num_envs, + scene=self.scene, # use flat scene + args=args.robot_args, + device=self.device, + ) + + # Add camera for image capture + self._setup_camera() + + # Table + table_pos = args.env_config.get("table_pos", (0.0, 0.0, 0.05)) + table_size = args.env_config.get("table_size", (0.6, 0.6, 0.1)) + self.entities["table"] = self.scene.add_entity( + morph=gs.morphs.Box( + pos=table_pos, + size=table_size, + ), + ) + + # Lifebuoy (using the lifebuoy.glb from assets) + lifebuoy_scale = args.env_config.get("lifebuoy_scale", 0.03) + self.entities["lifebuoy"] = self.scene.add_entity( + morph=gs.morphs.Mesh( + file="assets/lifebuoy.glb", + pos=(0.05, 0.2, 0.15), + euler=(0, 0, 90), + scale=lifebuoy_scale, + collision=True, + ), + ) + + # Hanger (using the hanger.glb from assets) + hanger_pos_raw = args.env_config.get("hanger_pos", (0.05, -0.2, 0.15)) + hanger_euler_raw = args.env_config.get("hanger_euler", (90, 0, 90)) + hanger_scale_raw = args.env_config.get("hanger_scale", (10, 5, 10)) + hanger_pos = ( + tuple(hanger_pos_raw) + if isinstance(hanger_pos_raw, list | tuple) + else (0.05, -0.2, 0.15) + ) + hanger_euler = ( + tuple(hanger_euler_raw) if isinstance(hanger_euler_raw, list | tuple) else (90, 0, 90) + ) + hanger_scale = ( + tuple(hanger_scale_raw) if isinstance(hanger_scale_raw, list | tuple) else (10, 5, 10) + ) + self.entities["hanger"] = self.scene.add_entity( + morph=gs.morphs.Mesh( + file="assets/hanger.glb", + pos=hanger_pos, + euler=hanger_euler, + scale=hanger_scale, + collision=True, + ), + ) + + self.entities["ee_frame"] = self.scene.add_entity( + morph=gs.morphs.Mesh( + file="meshes/axis.obj", + scale=0.15, + collision=False, + ), + ) + + # Build scene + self.scene.build(n_envs=1) + + # Command handling + self.last_command = None + + # Store entities for easy access + self.robot = self.entities["robot"] + + # Initialize with randomized positions + self._randomize_objects() + + # Track current target point for visualization + self.current_target_pos = None + + def _setup_camera( + self, + pos: tuple[float, float, float] = (1.5, 0.0, 0.7), + lookat: tuple[float, float, float] = (0.2, 0.0, 0.1), + fov: int = 50, + resolution: tuple[int, int] = (640, 480), + ) -> None: + """Setup camera for image capture using Genesis camera renderer.""" + # Add camera to scene (Genesis-specific) + self.camera = self.scene.add_camera( + res=resolution, + pos=pos, # Camera position + lookat=lookat, # Camera lookat point + fov=fov, # Field of view + GUI=False, # Don't show in GUI + ) + + def get_rgb_image(self, normalize: bool = True) -> torch.Tensor | None: + """Capture RGB image from camera.""" + try: + # Render camera image (Genesis-specific) + rgb, _, _, _ = self.camera.render( + rgb=True, depth=False, segmentation=False, normal=False + ) + + # Use base class processing logic + return self._process_rgb_tensor(rgb, normalize) + except Exception as e: + print(f"Warning: Could not capture camera image: {e}") + return None + + def initialize(self) -> None: + """Initialize the environment.""" + # Set lifebuoy mass + self.entities["lifebuoy"].set_mass(0.01) + + def _randomize_objects(self) -> None: + """Randomize object positions.""" + # Randomize lifebuoy position on table + lifebuoy_x = random.uniform(-0.1, 0.1) + lifebuoy_y = random.uniform(0.1, 0.3) + lifebuoy_pos = torch.tensor([lifebuoy_x, lifebuoy_y, 0.15], dtype=torch.float32) + self.entities["lifebuoy"].set_pos(lifebuoy_pos) + + # Hanger position is fixed + hanger_pos = torch.tensor([0.05, -0.2, 0.15], dtype=torch.float32) + self.entities["hanger"].set_pos(hanger_pos) + + def apply_action(self, action: torch.Tensor | KeyboardCommand) -> None: + """Apply action to the environment (BaseEnv requirement).""" + # Skip empty tensors from teleop wrapper + if isinstance(action, torch.Tensor) and action.numel() == 0: + return + + # Apply command object from teleop + self.last_command = action + + # Type narrowing: at this point, action is a KeyboardCommand + if isinstance(action, KeyboardCommand): + pos_quat = torch.concat([action.position, action.orientation], -1) + self.entities["ee_frame"].set_qpos(pos_quat) + # Apply action to robot + + robot_action = EEPoseAbsAction( + ee_link_pos=action.position, + ee_link_quat=action.orientation, + gripper_width=0.0 if action.gripper_close else 0.04, + ) + self.entities["robot"].apply_action(robot_action) + + # Handle special commands (if needed in the future) + # Note: KeyboardCommand doesn't currently support reset_scene or quit_teleop + + # Step the scene (like goal_reaching_env) + self.scene.step() + + def get_observations(self) -> dict[str, Any]: + """Get current observation as dictionary (BaseEnv requirement).""" + observations = { + "ee_pose": self.entities["robot"].ee_pose, + "joint_positions": self.entities["robot"].joint_positions, + "lifebuoy_pos": self.entities["lifebuoy"].get_pos(), + "lifebuoy_quat": self.entities["lifebuoy"].get_quat(), + "hanger_pos": self.entities["hanger"].get_pos(), + "hanger_quat": self.entities["hanger"].get_quat(), + } + + # Add RGB images using base class helper + return self._add_rgb_to_observations(observations) + + def get_ee_pose(self) -> tuple[torch.Tensor, torch.Tensor]: + """Get end-effector pose for teleop wrapper.""" + robot_pos = self.entities["robot"].ee_pose + return robot_pos[..., :3], robot_pos[..., 3:] + + def get_extra_infos(self) -> dict[str, Any]: + """Get extra information.""" + return {} + + def get_terminated(self) -> torch.Tensor: + """Get termination status.""" + return torch.tensor([False]) + + def get_truncated(self) -> torch.Tensor: + """Get truncation status.""" + return torch.tensor([False]) + + def get_reward(self) -> tuple[torch.Tensor, dict[str, torch.Tensor]]: + """Get reward.""" + return torch.tensor([0.0]), {} + + def is_episode_complete(self) -> torch.Tensor: + """Check if episode is complete - lifebuoy is hanging on hanger.""" + # Get AABB (Axis-Aligned Bounding Box) for both objects + lifebuoy_aabb = self.entities["lifebuoy"].get_AABB() # [2, 3] - min and max corners + hanger_aabb = self.entities["hanger"].get_AABB() # [2, 3] - min and max corners + + # Check if lifebuoy is in contact with hanger using AABB intersection + # Lifebuoy is hanging if its AABB intersects with hanger AABB + lifebuoy_min, lifebuoy_max = lifebuoy_aabb[0], lifebuoy_aabb[1] + hanger_min, hanger_max = hanger_aabb[0], hanger_aabb[1] + + # Check if lifebuoy is in contact with hanger (with some tolerance) + tolerance = 0.05 # 5cm tolerance + is_contacting = torch.all( + (lifebuoy_min <= hanger_max + tolerance) & (lifebuoy_max >= hanger_min - tolerance) + ) + + return torch.tensor([is_contacting]) + + def reset_idx(self, envs_idx: Any) -> None: + """Reset environment.""" + # Clear any existing debug objects + self.scene.clear_debug_objects() + + # Reset robot to natural pose + initial_q = torch.tensor( + [0.0, -0.3, 0.5, 0.0, 0.0, 0.0, 0.0], dtype=torch.float32 + ) # 7 joints to match registry format + self.entities["robot"].reset_to_pose(initial_q) + + # Randomize object positions + self._randomize_objects() + + # Reset target visualization + self.current_target_pos = None + + def step(self) -> None: + """Step the environment.""" + self.scene.step() + + @property + def num_envs(self) -> int: + """Get number of environments.""" + return self._num_envs diff --git a/src/env/gs_env/sim/envs/manipulation/pick_cube_env.py b/src/env/gs_env/sim/envs/manipulation/pick_cube_env.py index cab301f..68b52b0 100644 --- a/src/env/gs_env/sim/envs/manipulation/pick_cube_env.py +++ b/src/env/gs_env/sim/envs/manipulation/pick_cube_env.py @@ -4,6 +4,7 @@ import genesis as gs import numpy as np import torch +from gs_agent.wrappers.teleop_wrapper import KeyboardCommand from numpy.typing import NDArray from scipy.spatial.transform import Rotation as R @@ -21,11 +22,11 @@ class PickCubeEnv(BaseEnv): def __init__( self, args: EnvArgs, + num_envs: int = 1, device: torch.device = _DEFAULT_DEVICE, ) -> None: super().__init__(device=device) - self._device = device - self._num_envs = 1 # Single environment for teleop + self._num_envs = num_envs FPS = 60 # Create Genesis scene self.scene = gs.Scene( @@ -46,8 +47,8 @@ def __init__( camera_fov=50, max_FPS=200, ), - show_viewer=True, # Enable viewer for visualization - show_FPS=False, + show_viewer=bool(args.env_config.get("show_viewer", True)), + show_FPS=bool(args.env_config.get("show_FPS", False)), ) # Add entities @@ -64,11 +65,16 @@ def __init__( device=self.device, ) + # Add camera for image capture + self._setup_camera() + # Interactive cube + cube_pos = args.env_config.get("cube_pos", (0.5, 0.0, 0.07)) + cube_size = args.env_config.get("cube_size", (0.04, 0.04, 0.04)) self.entities["cube"] = self.scene.add_entity( morph=gs.morphs.Box( - pos=(0.5, 0.0, 0.07), - size=(0.04, 0.04, 0.04), + pos=cube_pos, + size=cube_size, ), ) @@ -95,6 +101,37 @@ def __init__( # Track current target point for visualization self.current_target_pos = None + def _setup_camera( + self, + pos: tuple[float, float, float] = (1.5, 0.0, 0.7), + lookat: tuple[float, float, float] = (0.2, 0.0, 0.1), + fov: int = 50, + resolution: tuple[int, int] = (640, 480), + ) -> None: + """Setup camera for image capture using Genesis camera renderer.""" + # Add camera to scene (similar to your example) + self.camera = self.scene.add_camera( + res=resolution, + pos=pos, # Camera position + lookat=lookat, # Camera lookat point + fov=fov, # Field of view + GUI=False, # Don't show in GUI + ) + + def get_rgb_image(self, normalize: bool = True) -> torch.Tensor | None: + """Capture RGB image from camera.""" + try: + # Render camera image (Genesis-specific) + rgb, _, _, _ = self.camera.render( + rgb=True, depth=False, segmentation=False, normal=False + ) + + # Use base class processing logic + return self._process_rgb_tensor(rgb, normalize) + except Exception as e: + print(f"Warning: Could not capture camera image: {e}") + return None + def initialize(self) -> None: """Initialize the environment.""" # Clear any existing debug objects @@ -108,54 +145,45 @@ def initialize(self) -> None: self._randomize_cube() # TODO: should not use Any but KeyboardCommand - def apply_action(self, action: Any) -> None: + def apply_action(self, action: torch.Tensor | KeyboardCommand) -> None: """Apply action to the environment (BaseEnv requirement).""" - # For teleop, action might be a command object instead of tensor - if action is not None: - # This is a command object from teleop - self.last_command = action + # Skip empty tensors from teleop wrapper + if isinstance(action, torch.Tensor) and action.numel() == 0: + return + + # Apply command object from teleop + self.last_command = action + # Type narrowing: at this point, action is a KeyboardCommand + if isinstance(action, KeyboardCommand): pos_quat = torch.concat([action.position, action.orientation], -1) self.entities["ee_frame"].set_qpos(pos_quat) # Apply action to robot - action = EEPoseAbsAction( + robot_action = EEPoseAbsAction( ee_link_pos=action.position, ee_link_quat=action.orientation, gripper_width=0.0 if action.gripper_close else 0.04, ) - self.entities["robot"].apply_action(action) + self.entities["robot"].apply_action(robot_action) - # Handle special commands - if hasattr(action, "reset_scene") and action.reset_scene: - self.reset_idx(torch.IntTensor([0])) - elif hasattr(action, "quit_teleop") and action.quit_teleop: - print("Quit command received from teleop") + # Handle special commands (if needed in the future) + # Note: KeyboardCommand doesn't currently support reset_scene or quit_teleop # Step the scene (like goal_reaching_env) self.scene.step() - def get_observations(self) -> torch.Tensor: - """Get current observation as tensor (BaseEnv requirement).""" - ee_pose = self.entities["robot"].ee_pose - joint_pos = self.entities["robot"].joint_positions - - # Get cube position - cube_pos = self.entities["cube"].get_pos() - cube_quat = self.entities["cube"].get_quat() - - # Concatenate all observations into a single tensor - obs_tensor = torch.cat( - [ - joint_pos, - ee_pose, - cube_pos, - cube_quat, - ], - dim=-1, - ) - - return obs_tensor + def get_observations(self) -> dict[str, Any]: + """Get current observation as dictionary (BaseEnv requirement).""" + observations = { + "ee_pose": self.entities["robot"].ee_pose, + "joint_positions": self.entities["robot"].joint_positions, + "cube_pos": self.entities["cube"].get_pos(), + "cube_quat": self.entities["cube"].get_quat(), + } + + # Add RGB images using base class helper + return self._add_rgb_to_observations(observations) def get_ee_pose(self) -> tuple[torch.Tensor, torch.Tensor]: robot_pos = self.entities["robot"].ee_pose diff --git a/src/env/gs_env/sim/envs/manipulation/put_bowl_inside_microwave_env.py b/src/env/gs_env/sim/envs/manipulation/put_bowl_inside_microwave_env.py new file mode 100644 index 0000000..f4b779d --- /dev/null +++ b/src/env/gs_env/sim/envs/manipulation/put_bowl_inside_microwave_env.py @@ -0,0 +1,294 @@ +import random +from typing import Any + +import genesis as gs +import torch +from gs_agent.wrappers.teleop_wrapper import KeyboardCommand + +from gs_env.common.bases.base_env import BaseEnv +from gs_env.sim.envs.config.schema import EnvArgs +from gs_env.sim.robots.config.schema import EEPoseAbsAction +from gs_env.sim.robots.manipulators import FrankaRobot + +_DEFAULT_DEVICE = torch.device("cpu") + + +class PutBowlInsideMicrowaveEnv(BaseEnv): + """Put bowl inside microwave environment.""" + + def __init__( + self, + args: EnvArgs, + num_envs: int = 1, + device: torch.device = _DEFAULT_DEVICE, + ) -> None: + super().__init__(device=device) + self._num_envs = num_envs + FPS = 60 + # Create Genesis scene + self.scene = gs.Scene( + sim_options=gs.options.SimOptions( + substeps=4, + dt=1 / FPS, + ), + rigid_options=gs.options.RigidOptions( + enable_joint_limit=True, + enable_collision=True, + gravity=(0, 0, -9.8), + box_box_detection=True, + constraint_timeconst=0.02, + ), + viewer_options=gs.options.ViewerOptions( + camera_pos=(1.5, 0.0, 0.7), + camera_lookat=(0.2, 0.0, 0.1), + camera_fov=50, + max_FPS=200, + ), + show_viewer=bool(args.env_config.get("show_viewer", True)), + show_FPS=bool(args.env_config.get("show_FPS", False)), + ) + + # Add entities + self.entities = {} + + # Ground plane + self.entities["plane"] = self.scene.add_entity(gs.morphs.Plane()) + + # SO101 robot + self.entities["robot"] = FrankaRobot( + num_envs=self._num_envs, + scene=self.scene, # use flat scene + args=args.robot_args, + device=self.device, + ) + + # Add camera for image capture + self._setup_camera() + + # Table + table_pos = args.env_config.get("table_pos", (0.0, 0.0, 0.05)) + table_size = args.env_config.get("table_size", (0.6, 0.6, 0.1)) + self.entities["table"] = self.scene.add_entity( + morph=gs.morphs.Box( + pos=table_pos, + size=table_size, + ), + ) + + # Bowl (using the winter_bowl.glb from assets) + bowl_scale = args.env_config.get("bowl_scale", 1 / 5000) + self.entities["bowl"] = self.scene.add_entity( + morph=gs.morphs.Mesh( + file="assets/winter_bowl.glb", + pos=(0.05, -0.2, 0.15), + euler=(90, 0, 90), + scale=bowl_scale, + collision=True, + ), + ) + + # Microwave (using the 7310 URDF from assets) + microwave_pos = args.env_config.get("microwave_pos", (0.2, 0.2, 0.18)) + microwave_euler = args.env_config.get("microwave_euler", (0, 0, 30)) + microwave_scale = args.env_config.get("microwave_scale", 0.3) + self.entities["microwave"] = self.scene.add_entity( + morph=gs.morphs.URDF( + file="assets/7310/mobility.urdf", + pos=microwave_pos, + euler=microwave_euler, + scale=microwave_scale, + collision=True, + merge_fixed_links=True, + convexify=False, + ), + ) + + self.entities["ee_frame"] = self.scene.add_entity( + morph=gs.morphs.Mesh( + file="meshes/axis.obj", + scale=0.15, + collision=False, + ), + ) + + # Build scene + self.scene.build(n_envs=1) + + # Command handling + self.last_command = None + + # Store entities for easy access + self.robot = self.entities["robot"] + + # Initialize with randomized positions + self._randomize_objects() + + # Track current target point for visualization + self.current_target_pos = None + + def _setup_camera( + self, + pos: tuple[float, float, float] = (1.5, 0.0, 0.7), + lookat: tuple[float, float, float] = (0.2, 0.0, 0.1), + fov: int = 50, + resolution: tuple[int, int] = (640, 480), + ) -> None: + """Setup camera for image capture using Genesis camera renderer.""" + # Add camera to scene (Genesis-specific) + self.camera = self.scene.add_camera( + res=resolution, + pos=pos, # Camera position + lookat=lookat, # Camera lookat point + fov=fov, # Field of view + GUI=False, # Don't show in GUI + ) + + def get_rgb_image(self, normalize: bool = True) -> torch.Tensor | None: + """Capture RGB image from camera.""" + try: + # Render camera image (Genesis-specific) + rgb, _, _, _ = self.camera.render( + rgb=True, depth=False, segmentation=False, normal=False + ) + + # Use base class processing logic + return self._process_rgb_tensor(rgb, normalize) + except Exception as e: + print(f"Warning: Could not capture camera image: {e}") + return None + + def initialize(self) -> None: + """Initialize the environment.""" + # Set bowl mass + self.entities["bowl"].set_mass(0.01) + + # Set microwave door damping + # Set damping for microwave (8 DOFs: 3 pos + 4 quat + 1 joint) + self.entities["microwave"].set_dofs_damping([0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 1.0]) + + def _randomize_objects(self) -> None: + """Randomize object positions.""" + # Randomize bowl position on table + bowl_x = random.uniform(-0.1, 0.1) + bowl_y = random.uniform(-0.2, 0.0) + bowl_pos = torch.tensor([bowl_x, bowl_y, 0.15], dtype=torch.float32) + self.entities["bowl"].set_pos(bowl_pos) + + # Microwave position is fixed + microwave_pos = torch.tensor([0.2, 0.2, 0.18], dtype=torch.float32) + self.entities["microwave"].set_pos(microwave_pos) + + def apply_action(self, action: torch.Tensor | KeyboardCommand) -> None: + """Apply action to the environment (BaseEnv requirement).""" + # Skip empty tensors from teleop wrapper + if isinstance(action, torch.Tensor) and action.numel() == 0: + return + + # Apply command object from teleop + self.last_command = action + + # Type narrowing: at this point, action is a KeyboardCommand + if isinstance(action, KeyboardCommand): + pos_quat = torch.concat([action.position, action.orientation], -1) + self.entities["ee_frame"].set_qpos(pos_quat) + # Apply action to robot + + robot_action = EEPoseAbsAction( + ee_link_pos=action.position, + ee_link_quat=action.orientation, + gripper_width=0.0 if action.gripper_close else 0.04, + ) + self.entities["robot"].apply_action(robot_action) + + # Handle special commands (if needed in the future) + # Note: KeyboardCommand doesn't currently support reset_scene or quit_teleop + + # Step the scene (like goal_reaching_env) + self.scene.step() + + def get_observations(self) -> dict[str, Any]: + """Get current observation as dictionary (BaseEnv requirement).""" + observations = { + "ee_pose": self.entities["robot"].ee_pose, + "joint_positions": self.entities["robot"].joint_positions, + "bowl_pos": self.entities["bowl"].get_pos(), + "bowl_quat": self.entities["bowl"].get_quat(), + "microwave_pos": self.entities["microwave"].get_pos(), + "microwave_quat": self.entities["microwave"].get_quat(), + } + + # Add RGB images using base class helper + return self._add_rgb_to_observations(observations) + + def get_ee_pose(self) -> tuple[torch.Tensor, torch.Tensor]: + """Get end-effector pose for teleop wrapper.""" + robot_pos = self.entities["robot"].ee_pose + return robot_pos[..., :3], robot_pos[..., 3:] + + def get_extra_infos(self) -> dict[str, Any]: + """Get extra information.""" + return {} + + def get_terminated(self) -> torch.Tensor: + """Get termination status.""" + return torch.tensor([False]) + + def get_truncated(self) -> torch.Tensor: + """Get truncation status.""" + return torch.tensor([False]) + + def get_reward(self) -> tuple[torch.Tensor, dict[str, torch.Tensor]]: + """Get reward.""" + return torch.tensor([0.0]), {} + + def is_episode_complete(self) -> torch.Tensor: + """Check if episode is complete - bowl is inside microwave.""" + # Get AABB (Axis-Aligned Bounding Box) for both objects + bowl_aabb = self.entities["bowl"].get_AABB() # [2, 3] - min and max corners + microwave_aabb = self.entities["microwave"].get_AABB() # [2, 3] - min and max corners + + # Check if bowl is inside microwave using AABB intersection + # Bowl is inside if its AABB is completely contained within microwave AABB + bowl_min, bowl_max = bowl_aabb[0], bowl_aabb[1] + microwave_min, microwave_max = microwave_aabb[0], microwave_aabb[1] + + # Check if bowl is inside microwave (with some tolerance) + tolerance = 0.05 # 5cm tolerance + is_inside = torch.all( + (bowl_min >= microwave_min - tolerance) & (bowl_max <= microwave_max + tolerance) + ) + + return torch.tensor([is_inside]) + + def reset_idx(self, envs_idx: Any) -> None: + """Reset environment.""" + # Clear any existing debug objects + self.scene.clear_debug_objects() + + # Reset robot to natural pose + initial_q = torch.tensor( + [0.0, -0.3, 0.5, 0.0, 0.0, 0.0, 0.0], dtype=torch.float32 + ) # 7 joints to match registry format + self.entities["robot"].reset_to_pose(initial_q) + + # Reset microwave door to closed position + current_qpos = self.entities["microwave"].get_qpos() + # Keep position and orientation, but set door joint to 0 (closed) + reset_qpos = current_qpos.clone() + reset_qpos[0, 7] = 0.0 # Set door joint to closed position + self.entities["microwave"].set_qpos(reset_qpos) + + # Randomize object positions + self._randomize_objects() + + # Reset target visualization + self.current_target_pos = None + + def step(self) -> None: + """Step the environment.""" + self.scene.step() + + @property + def num_envs(self) -> int: + """Get number of environments.""" + return self._num_envs diff --git a/src/env/gs_env/sim/envs/manipulation/sweep_table_env.py b/src/env/gs_env/sim/envs/manipulation/sweep_table_env.py new file mode 100644 index 0000000..87cd23f --- /dev/null +++ b/src/env/gs_env/sim/envs/manipulation/sweep_table_env.py @@ -0,0 +1,327 @@ +import random +from typing import Any + +import genesis as gs +import torch +from gs_agent.wrappers.teleop_wrapper import KeyboardCommand + +from gs_env.common.bases.base_env import BaseEnv +from gs_env.sim.envs.config.schema import EnvArgs +from gs_env.sim.robots.config.schema import EEPoseAbsAction +from gs_env.sim.robots.manipulators import FrankaRobot + +_DEFAULT_DEVICE = torch.device("cpu") + + +class SweepTableEnv(BaseEnv): + """Sweep table environment - sweep trashboxes to target zone.""" + + def __init__( + self, + args: EnvArgs, + num_envs: int = 1, + device: torch.device = _DEFAULT_DEVICE, + ) -> None: + super().__init__(device=device) + self._num_envs = num_envs + FPS = 60 + # Create Genesis scene + self.scene = gs.Scene( + sim_options=gs.options.SimOptions( + substeps=4, + dt=1 / FPS, + ), + rigid_options=gs.options.RigidOptions( + enable_joint_limit=True, + enable_collision=True, + gravity=(0, 0, -9.8), + box_box_detection=True, + constraint_timeconst=0.02, + ), + viewer_options=gs.options.ViewerOptions( + camera_pos=(1.5, 0.0, 0.7), + camera_lookat=(0.2, 0.0, 0.1), + camera_fov=50, + max_FPS=200, + ), + show_viewer=bool(args.env_config.get("show_viewer", True)), + show_FPS=bool(args.env_config.get("show_FPS", False)), + ) + + # Add entities + self.entities = {} + + # Ground plane + self.entities["plane"] = self.scene.add_entity(gs.morphs.Plane()) + + # SO101 robot + self.entities["robot"] = FrankaRobot( + num_envs=self._num_envs, + scene=self.scene, # use flat scene + args=args.robot_args, + device=self.device, + ) + + # Add camera for image capture + self._setup_camera() + + # Table + table_pos = args.env_config.get("table_pos", (0.0, 0.0, 0.05)) + table_size = args.env_config.get("table_size", (0.6, 0.6, 0.1)) + self.entities["table"] = self.scene.add_entity( + morph=gs.morphs.Box( + pos=table_pos, + size=table_size, + ), + ) + + # Broom (using the broom.glb from assets) + broom_pos_raw = args.env_config.get("broom_pos", (0.05, -0.2, 0.15)) + broom_euler_raw = args.env_config.get("broom_euler", (90, 0, 90)) + broom_scale_raw = args.env_config.get("broom_scale", (1 / 400, 1 / 800, 1 / 400)) + broom_pos = ( + tuple(broom_pos_raw) if isinstance(broom_pos_raw, list | tuple) else (0.05, -0.2, 0.15) + ) + broom_euler = ( + tuple(broom_euler_raw) if isinstance(broom_euler_raw, list | tuple) else (90, 0, 90) + ) + broom_scale = ( + tuple(broom_scale_raw) + if isinstance(broom_scale_raw, list | tuple) + else (1 / 400, 1 / 800, 1 / 400) + ) + self.entities["broom"] = self.scene.add_entity( + morph=gs.morphs.Mesh( + file="assets/broom.glb", + pos=broom_pos, + euler=broom_euler, + scale=broom_scale, + collision=True, + ), + ) + + # Trashbox A + trashbox_size = args.env_config.get("trashbox_size", (0.03, 0.03, 0.03)) + self.entities["trashbox_a"] = self.scene.add_entity( + morph=gs.morphs.Box( + pos=(0.15, 0.0, 0.15), + size=trashbox_size, + ), + ) + + # Trashbox B + self.entities["trashbox_b"] = self.scene.add_entity( + morph=gs.morphs.Box( + pos=(0.15, -0.1, 0.15), + size=trashbox_size, + ), + ) + + # Target zone (red area on table) + target_zone_pos = args.env_config.get("target_zone_pos", (0.1, 0.3, 0.045)) + target_zone_size = args.env_config.get("target_zone_size", (0.3, 0.3, 0.003)) + self.entities["target_zone"] = self.scene.add_entity( + morph=gs.morphs.Box( + pos=target_zone_pos, + size=target_zone_size, + ), + ) + + self.entities["ee_frame"] = self.scene.add_entity( + morph=gs.morphs.Mesh( + file="meshes/axis.obj", + scale=0.15, + collision=False, + ), + ) + + # Build scene + self.scene.build(n_envs=1) + + # Command handling + self.last_command = None + + # Store entities for easy access + self.robot = self.entities["robot"] + + # Initialize with randomized positions + self._randomize_objects() + + # Track current target point for visualization + self.current_target_pos = None + + def _setup_camera( + self, + pos: tuple[float, float, float] = (1.5, 0.0, 0.7), + lookat: tuple[float, float, float] = (0.2, 0.0, 0.1), + fov: int = 50, + resolution: tuple[int, int] = (640, 480), + ) -> None: + """Setup camera for image capture using Genesis camera renderer.""" + # Add camera to scene (Genesis-specific) + self.camera = self.scene.add_camera( + res=resolution, + pos=pos, # Camera position + lookat=lookat, # Camera lookat point + fov=fov, # Field of view + GUI=False, # Don't show in GUI + ) + + def get_rgb_image(self, normalize: bool = True) -> torch.Tensor | None: + """Capture RGB image from camera.""" + try: + # Render camera image (Genesis-specific) + rgb, _, _, _ = self.camera.render( + rgb=True, depth=False, segmentation=False, normal=False + ) + + # Use base class processing logic + return self._process_rgb_tensor(rgb, normalize) + except Exception as e: + print(f"Warning: Could not capture camera image: {e}") + return None + + def initialize(self) -> None: + """Initialize the environment.""" + # Set object masses + self.entities["broom"].set_mass(0.05) + self.entities["trashbox_a"].set_mass(0.005) + self.entities["trashbox_b"].set_mass(0.005) + + def _randomize_objects(self) -> None: + """Randomize object positions.""" + # Randomize trashbox positions on table + trashbox_a_x = random.uniform(0.1, 0.2) + trashbox_a_y = random.uniform(-0.05, 0.05) + trashbox_a_pos = torch.tensor([trashbox_a_x, trashbox_a_y, 0.15], dtype=torch.float32) + self.entities["trashbox_a"].set_pos(trashbox_a_pos) + + trashbox_b_x = random.uniform(0.1, 0.2) + trashbox_b_y = random.uniform(-0.15, -0.05) + trashbox_b_pos = torch.tensor([trashbox_b_x, trashbox_b_y, 0.15], dtype=torch.float32) + self.entities["trashbox_b"].set_pos(trashbox_b_pos) + + # Broom position is fixed + broom_pos = torch.tensor([0.05, -0.2, 0.15], dtype=torch.float32) + self.entities["broom"].set_pos(broom_pos) + + # Target zone position is fixed + target_zone_pos = torch.tensor([0.1, 0.3, 0.045], dtype=torch.float32) + self.entities["target_zone"].set_pos(target_zone_pos) + + def apply_action(self, action: torch.Tensor | KeyboardCommand) -> None: + """Apply action to the environment (BaseEnv requirement).""" + # Skip empty tensors from teleop wrapper + if isinstance(action, torch.Tensor) and action.numel() == 0: + return + + # Apply command object from teleop + self.last_command = action + + # Type narrowing: at this point, action is a KeyboardCommand + if isinstance(action, KeyboardCommand): + pos_quat = torch.concat([action.position, action.orientation], -1) + self.entities["ee_frame"].set_qpos(pos_quat) + # Apply action to robot + + robot_action = EEPoseAbsAction( + ee_link_pos=action.position, + ee_link_quat=action.orientation, + gripper_width=0.0 if action.gripper_close else 0.04, + ) + self.entities["robot"].apply_action(robot_action) + + # Handle special commands (if needed in the future) + # Note: KeyboardCommand doesn't currently support reset_scene or quit_teleop + + # Step the scene (like goal_reaching_env) + self.scene.step() + + def get_observations(self) -> dict[str, Any]: + """Get current observation as dictionary (BaseEnv requirement).""" + observations = { + "ee_pose": self.entities["robot"].ee_pose, + "joint_positions": self.entities["robot"].joint_positions, + "broom_pos": self.entities["broom"].get_pos(), + "broom_quat": self.entities["broom"].get_quat(), + "trashbox_a_pos": self.entities["trashbox_a"].get_pos(), + "trashbox_a_quat": self.entities["trashbox_a"].get_quat(), + "trashbox_b_pos": self.entities["trashbox_b"].get_pos(), + "trashbox_b_quat": self.entities["trashbox_b"].get_quat(), + "target_zone_pos": self.entities["target_zone"].get_pos(), + "target_zone_quat": self.entities["target_zone"].get_quat(), + } + + # Add RGB images using base class helper + return self._add_rgb_to_observations(observations) + + def get_ee_pose(self) -> tuple[torch.Tensor, torch.Tensor]: + """Get end-effector pose for teleop wrapper.""" + robot_pos = self.entities["robot"].ee_pose + return robot_pos[..., :3], robot_pos[..., 3:] + + def get_extra_infos(self) -> dict[str, Any]: + """Get extra information.""" + return {} + + def get_terminated(self) -> torch.Tensor: + """Get termination status.""" + return torch.tensor([False]) + + def get_truncated(self) -> torch.Tensor: + """Get truncation status.""" + return torch.tensor([False]) + + def get_reward(self) -> tuple[torch.Tensor, dict[str, torch.Tensor]]: + """Get reward.""" + return torch.tensor([0.0]), {} + + def is_episode_complete(self) -> torch.Tensor: + """Check if episode is complete - both trashboxes are in target zone.""" + # Get AABB (Axis-Aligned Bounding Box) for all objects + trashbox_a_aabb = self.entities["trashbox_a"].get_AABB() # [2, 3] - min and max corners + trashbox_b_aabb = self.entities["trashbox_b"].get_AABB() # [2, 3] - min and max corners + target_zone_aabb = self.entities["target_zone"].get_AABB() # [2, 3] - min and max corners + + # Check if trashbox_a is in target zone + trashbox_a_center = trashbox_a_aabb.mean(dim=0) + target_zone_min, target_zone_max = target_zone_aabb[0], target_zone_aabb[1] + + trashbox_a_inside = torch.all( + (trashbox_a_center >= target_zone_min) & (trashbox_a_center <= target_zone_max) + ) + + # Check if trashbox_b is in target zone + trashbox_b_center = trashbox_b_aabb.mean(dim=0) + trashbox_b_inside = torch.all( + (trashbox_b_center >= target_zone_min) & (trashbox_b_center <= target_zone_max) + ) + + # Both trashboxes must be in target zone + return torch.tensor([trashbox_a_inside and trashbox_b_inside]) + + def reset_idx(self, envs_idx: Any) -> None: + """Reset environment.""" + # Clear any existing debug objects + self.scene.clear_debug_objects() + + # Reset robot to natural pose + initial_q = torch.tensor( + [0.0, -0.3, 0.5, 0.0, 0.0, 0.0, 0.0], dtype=torch.float32 + ) # 7 joints to match registry format + self.entities["robot"].reset_to_pose(initial_q) + + # Randomize object positions + self._randomize_objects() + + # Reset target visualization + self.current_target_pos = None + + def step(self) -> None: + """Step the environment.""" + self.scene.step() + + @property + def num_envs(self) -> int: + """Get number of environments.""" + return self._num_envs