Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import gymnasium as gym
import gym_pusht

env = gym.make("gym_pusht/PushT-v0", render_mode="human")
env = gym.make("gym_pusht/PushT-v1", render_mode="human")
observation, info = env.reset()

for _ in range(1000):
Expand Down
10 changes: 9 additions & 1 deletion gym_pusht/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,13 @@
id="gym_pusht/PushT-v0",
entry_point="gym_pusht.envs:PushTEnv",
max_episode_steps=300,
kwargs={"obs_type": "state"},
kwargs={"obs_type": "state", "randomize_goal": False},
)

# Register a version with randomized goal
register(
id="gym_pusht/PushT-v1",
entry_point="gym_pusht.envs:PushTEnv",
max_episode_steps=300,
kwargs={"obs_type": "state", "randomize_goal": True},
)
89 changes: 75 additions & 14 deletions gym_pusht/envs/pusht.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,16 @@ class PushTEnv(gym.Env):

If `obs_type` is set to `state`, the observation space is a 5-dimensional vector representing the state of the
environment: [agent_x, agent_y, block_x, block_y, block_angle]. The values are in the range [0, 512] for the agent
and block positions and [0, 2*pi] for the block angle.
and block positions and [0, 2*pi] for the block angle. When `randomize_goal=True`, the observation becomes
8-dimensional with additional [goal_x, goal_y, goal_theta] values.

If `obs_type` is set to `environment_state_agent_pos` the observation space is a dictionary with:
- `environment_state`: 16-dimensional vector representing the keypoint locations of the T (in [x0, y0, x1, y1, ...]
format). The values are in the range [0, 512]. See `get_keypoints` for a diagram showing the location of the
keypoint indices.
- `agent_pos`: A 2-dimensional vector representing the position of the robot end-effector.
- `goal_state`: When `randomize_goal=True`, includes a 16-dimensional vector representing the keypoint locations
of the goal T shape in the same format as `environment_state`.

If `obs_type` is set to `pixels`, the observation space is a 96x96 RGB image of the environment.

Expand Down Expand Up @@ -105,6 +108,8 @@ class PushTEnv(gym.Env):

* `visualization_height`: (int) The height of the visualized image. Default is `680`.

* `randomize_goal`: (bool) Whether to randomize the goal position. Default is `False`.

## Reset Arguments

Passing the option `options["reset_to_state"]` will reset the environment to a specific state.
Expand Down Expand Up @@ -144,11 +149,15 @@ def __init__(
observation_height=96,
visualization_width=680,
visualization_height=680,
randomize_goal=False,
):
super().__init__()
# Observations
self.obs_type = obs_type

# Goal Randomization
self.randomize_goal = randomize_goal

# Rendering
self.render_mode = render_mode
self.observation_width = observation_width
Expand All @@ -167,6 +176,11 @@ def __init__(
self.block_cog = block_cog
self.damping = damping

# Safe margins from walls for positioning objects
self.margin = 140 # Margin from walls to avoid spawning too close to edges
self.min_pos = np.array([self.margin, self.margin])
self.max_pos = np.array([512 - self.margin, 512 - self.margin])

# If human-rendering is used, `self.window` will be a reference
# to the window that we draw to. `self.clock` will be a clock that is used
# to ensure that the environment is rendered at the correct framerate in
Expand All @@ -182,12 +196,20 @@ def __init__(

def _initialize_observation_space(self):
if self.obs_type == "state":
# [agent_x, agent_y, block_x, block_y, block_angle]
self.observation_space = spaces.Box(
low=np.array([0, 0, 0, 0, 0]),
high=np.array([512, 512, 512, 512, 2 * np.pi]),
dtype=np.float64,
)
if self.randomize_goal:
# [agent_x, agent_y, block_x, block_y, block_angle, goal_x, goal_y, goal_theta]
self.observation_space = spaces.Box(
low=np.array([0, 0, 0, 0, 0, 0, 0, 0]),
high=np.array([512, 512, 512, 512, 2 * np.pi, 512, 512, 2 * np.pi]),
dtype=np.float64,
)
else:
# [agent_x, agent_y, block_x, block_y, block_angle]
self.observation_space = spaces.Box(
low=np.array([0, 0, 0, 0, 0]),
high=np.array([512, 512, 512, 512, 2 * np.pi]),
dtype=np.float64,
)
elif self.obs_type == "environment_state_agent_pos":
self.observation_space = spaces.Dict(
{
Expand All @@ -203,6 +225,12 @@ def _initialize_observation_space(self):
),
},
)
if self.randomize_goal:
self.observation_space["goal_state"] = spaces.Box(
low=np.zeros(16),
high=np.full((16,), 512),
dtype=np.float64,
)
elif self.obs_type == "pixels":
self.observation_space = spaces.Box(
low=0, high=255, shape=(self.observation_height, self.observation_width, 3), dtype=np.uint8
Expand Down Expand Up @@ -269,18 +297,28 @@ def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._setup()

# Randomize goal if enabled
if self.randomize_goal:
# Randomize goal position and orientation
goal_x = self.np_random.uniform(self.min_pos[0], self.max_pos[0])
goal_y = self.np_random.uniform(self.min_pos[1], self.max_pos[1])
goal_theta = self.np_random.uniform(0, 2 * np.pi)
self.goal_pose = np.array([goal_x, goal_y, goal_theta])
self._update_goal_shapes()

# Handle state reset
if options is not None and options.get("reset_to_state") is not None:
state = np.array(options.get("reset_to_state"))
else:
# state = self.np_random.uniform(low=[50, 50, 100, 100, -np.pi], high=[450, 450, 400, 400, np.pi])
rs = np.random.RandomState(seed=seed)
state = np.array(
[
rs.randint(50, 450),
rs.randint(50, 450),
rs.randint(100, 400),
rs.randint(100, 400),
rs.randn() * 2 * np.pi - np.pi,
self.np_random.uniform(self.min_pos[0], self.max_pos[0]), # agent_x
self.np_random.uniform(self.min_pos[1], self.max_pos[1]), # agent_y
self.np_random.uniform(self.min_pos[0], self.max_pos[0]), # block_x
self.np_random.uniform(self.min_pos[1], self.max_pos[1]), # block_y
self.np_random.uniform(0, 2 * np.pi), # block_angle
],
# dtype=np.float64
)
Expand Down Expand Up @@ -385,14 +423,26 @@ def get_obs(self):
agent_position = np.array(self.agent.position)
block_position = np.array(self.block.position)
block_angle = self.block.angle % (2 * np.pi)
return np.concatenate([agent_position, block_position, [block_angle]], dtype=np.float64)
goal_position = self.goal_pose[:2]
goal_angle = self.goal_pose[2]
obs = np.concatenate([agent_position, block_position, [block_angle]], dtype=np.float64)

if self.randomize_goal:
obs = np.concatenate([obs, goal_position, [goal_angle]], dtype=np.float64)

return obs

if self.obs_type == "environment_state_agent_pos":
return {
obs = {
"environment_state": self.get_keypoints(self._block_shapes).flatten(),
"agent_pos": np.array(self.agent.position),
}

if self.randomize_goal:
obs["goal_state"] = self.get_keypoints(self._goal_shapes).flatten()

return obs

pixels = self._render()
if self.obs_type == "pixels":
return pixels
Expand Down Expand Up @@ -446,7 +496,9 @@ def _setup(self):
# Add agent, block, and goal zone
self.agent = self.add_circle(self.space, (256, 400), 15)
self.block, self._block_shapes = self.add_tee(self.space, (256, 300), 0)
# Default goal pose that will be used if randomization is disabled
self.goal_pose = np.array([256, 256, np.pi / 4]) # x, y, theta (in radians)
self._update_goal_shapes()
if self.block_cog is not None:
self.block.center_of_gravity = self.block_cog

Expand All @@ -466,6 +518,15 @@ def _set_state(self, state):
# Run physics to take effect
self.space.step(self.dt)

def _update_goal_shapes(self):
goal_body = self.get_goal_pose_body(self.goal_pose)
self._goal_shapes = []
for shape in self.block.shapes:
verts = shape.get_vertices()
new_shape = pymunk.Poly(goal_body, verts)
self._goal_shapes.append(new_shape)
self._goal_body = goal_body

@staticmethod
def add_segment(space, a, b, radius):
# TODO(rcadene): rename add_segment to make_segment, since it is not added to the space
Expand Down