diff --git a/example.py b/example.py index 2e9778e..7233ae4 100644 --- a/example.py +++ b/example.py @@ -1,7 +1,7 @@ import gymnasium as gym import gym_pusht -env = gym.make("gym_pusht/PushT-v0", render_mode="human") +env = gym.make("gym_pusht/PushT-v1", render_mode="human") observation, info = env.reset() for _ in range(1000): diff --git a/gym_pusht/__init__.py b/gym_pusht/__init__.py index 93fb999..6686bfc 100644 --- a/gym_pusht/__init__.py +++ b/gym_pusht/__init__.py @@ -4,5 +4,13 @@ id="gym_pusht/PushT-v0", entry_point="gym_pusht.envs:PushTEnv", max_episode_steps=300, - kwargs={"obs_type": "state"}, + kwargs={"obs_type": "state", "randomize_goal": False}, +) + +# Register a version with randomized goal +register( + id="gym_pusht/PushT-v1", + entry_point="gym_pusht.envs:PushTEnv", + max_episode_steps=300, + kwargs={"obs_type": "state", "randomize_goal": True}, ) diff --git a/gym_pusht/envs/pusht.py b/gym_pusht/envs/pusht.py index 73c9a35..22f8045 100644 --- a/gym_pusht/envs/pusht.py +++ b/gym_pusht/envs/pusht.py @@ -54,13 +54,15 @@ class PushTEnv(gym.Env): If `obs_type` is set to `state`, the observation space is a 5-dimensional vector representing the state of the environment: [agent_x, agent_y, block_x, block_y, block_angle]. The values are in the range [0, 512] for the agent - and block positions and [0, 2*pi] for the block angle. + and block positions and [0, 2*pi] for the block angle. When `randomize_goal=True`, the observation is extended by [goal_x, goal_y, goal_theta] to become 8-dimensional. If `obs_type` is set to `environment_state_agent_pos` the observation space is a dictionary with: - `environment_state`: 16-dimensional vector representing the keypoint locations of the T (in [x0, y0, x1, y1, ...] format). The values are in the range [0, 512]. See `get_keypoints` for a diagram showing the location of the keypoint indices. - `agent_pos`: A 2-dimensional vector representing the position of the robot end-effector. + - `goal_state`: When `randomize_goal=True`, includes a 16-dimensional vector representing the keypoint locations + of the goal T shape in the same format as `environment_state`. If `obs_type` is set to `pixels`, the observation space is a 96x96 RGB image of the environment. @@ -105,6 +107,8 @@ class PushTEnv(gym.Env): * `visualization_height`: (int) The height of the visualized image. Default is `680`. + * `randomize_goal`: (bool) Whether to randomize the goal position. Default is `False`. + ## Reset Arguments Passing the option `options["reset_to_state"]` will reset the environment to a specific state. @@ -144,11 +148,15 @@ def __init__( observation_height=96, visualization_width=680, visualization_height=680, + randomize_goal=False, ): super().__init__() # Observations self.obs_type = obs_type + # Goal Randomization + self.randomize_goal = randomize_goal + # Rendering self.render_mode = render_mode self.observation_width = observation_width @@ -167,6 +175,11 @@ def __init__( self.block_cog = block_cog self.damping = damping + # Safe margins from walls for positioning objects + self.margin = 140 # Margin from walls to avoid spawning too close to edges + self.min_pos = np.array([self.margin, self.margin]) + self.max_pos = np.array([512 - self.margin, 512 - self.margin]) + # If human-rendering is used, `self.window` will be a reference # to the window that we draw to. `self.clock` will be a clock that is used # to ensure that the environment is rendered at the correct framerate in @@ -182,12 +195,20 @@ def __init__( def _initialize_observation_space(self): if self.obs_type == "state": - # [agent_x, agent_y, block_x, block_y, block_angle] - self.observation_space = spaces.Box( - low=np.array([0, 0, 0, 0, 0]), - high=np.array([512, 512, 512, 512, 2 * np.pi]), - dtype=np.float64, - ) + if self.randomize_goal: + # [agent_x, agent_y, block_x, block_y, block_angle, goal_x, goal_y, goal_theta] + self.observation_space = spaces.Box( + low=np.array([0, 0, 0, 0, 0, 0, 0, 0]), + high=np.array([512, 512, 512, 512, 2 * np.pi, 512, 512, 2 * np.pi]), + dtype=np.float64, + ) + else: + # [agent_x, agent_y, block_x, block_y, block_angle] + self.observation_space = spaces.Box( + low=np.array([0, 0, 0, 0, 0]), + high=np.array([512, 512, 512, 512, 2 * np.pi]), + dtype=np.float64, + ) elif self.obs_type == "environment_state_agent_pos": self.observation_space = spaces.Dict( { @@ -203,6 +224,12 @@ def _initialize_observation_space(self): ), }, ) + if self.randomize_goal: + self.observation_space["goal_state"] = spaces.Box( + low=np.zeros(16), + high=np.full((16,), 512), + dtype=np.float64, + ) elif self.obs_type == "pixels": self.observation_space = spaces.Box( low=0, high=255, shape=(self.observation_height, self.observation_width, 3), dtype=np.uint8 @@ -269,6 +296,16 @@ def reset(self, seed=None, options=None): super().reset(seed=seed) self._setup() + # Randomize goal if enabled + if self.randomize_goal: + # Randomize goal position and orientation + goal_x = self.np_random.uniform(self.min_pos[0], self.max_pos[0]) + goal_y = self.np_random.uniform(self.min_pos[1], self.max_pos[1]) + goal_theta = self.np_random.uniform(0, 2 * np.pi) + self.goal_pose = np.array([goal_x, goal_y, goal_theta]) + self._update_goal_shapes() + + # Handle state reset if options is not None and options.get("reset_to_state") is not None: state = np.array(options.get("reset_to_state")) else: @@ -276,11 +313,11 @@ def reset(self, seed=None, options=None): rs = np.random.RandomState(seed=seed) state = np.array( [ - rs.randint(50, 450), - rs.randint(50, 450), - rs.randint(100, 400), - rs.randint(100, 400), - rs.randn() * 2 * np.pi - np.pi, + self.np_random.uniform(self.min_pos[0], self.max_pos[0]), # agent_x + self.np_random.uniform(self.min_pos[1], self.max_pos[1]), # agent_y + self.np_random.uniform(self.min_pos[0], self.max_pos[0]), # block_x + self.np_random.uniform(self.min_pos[1], self.max_pos[1]), # block_y + self.np_random.uniform(0, 2 * np.pi), # block_angle ], # dtype=np.float64 ) @@ -385,14 +422,26 @@ def get_obs(self): agent_position = np.array(self.agent.position) block_position = np.array(self.block.position) block_angle = self.block.angle % (2 * np.pi) - return np.concatenate([agent_position, block_position, [block_angle]], dtype=np.float64) + goal_position = self.goal_pose[:2] + goal_angle = self.goal_pose[2] + obs = np.concatenate([agent_position, block_position, [block_angle]], dtype=np.float64) + + if self.randomize_goal: + obs = np.concatenate([obs, goal_position, [goal_angle]], dtype=np.float64) + + return obs if self.obs_type == "environment_state_agent_pos": - return { + obs = { "environment_state": self.get_keypoints(self._block_shapes).flatten(), "agent_pos": np.array(self.agent.position), } + if self.randomize_goal: + obs["goal_state"] = self.get_keypoints(self._goal_shapes).flatten() + + return obs + pixels = self._render() if self.obs_type == "pixels": return pixels @@ -446,7 +495,9 @@ def _setup(self): # Add agent, block, and goal zone self.agent = self.add_circle(self.space, (256, 400), 15) self.block, self._block_shapes = self.add_tee(self.space, (256, 300), 0) + # Default goal pose that will be used if randomization is disabled self.goal_pose = np.array([256, 256, np.pi / 4]) # x, y, theta (in radians) + self._update_goal_shapes() if self.block_cog is not None: self.block.center_of_gravity = self.block_cog @@ -466,6 +517,15 @@ def _set_state(self, state): # Run physics to take effect self.space.step(self.dt) + def _update_goal_shapes(self): + goal_body = self.get_goal_pose_body(self.goal_pose) + self._goal_shapes = [] + for shape in self.block.shapes: + verts = shape.get_vertices() + new_shape = pymunk.Poly(goal_body, verts) + self._goal_shapes.append(new_shape) + self._goal_body = goal_body + @staticmethod def add_segment(space, a, b, radius): # TODO(rcadene): rename add_segment to make_segment, since it is not added to the space