Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import gymnasium as gym
import gym_pusht

env = gym.make("gym_pusht/PushT-v0", render_mode="human")
env = gym.make("gym_pusht/PushT-v1", render_mode="human")
observation, info = env.reset()

for _ in range(1000):
Expand Down
10 changes: 9 additions & 1 deletion gym_pusht/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,13 @@
id="gym_pusht/PushT-v0",
entry_point="gym_pusht.envs:PushTEnv",
max_episode_steps=300,
kwargs={"obs_type": "state"},
kwargs={"obs_type": "state", "randomize_goal": False},
)

# Register a version with randomized goal
register(
id="gym_pusht/PushT-v1",
entry_point="gym_pusht.envs:PushTEnv",
max_episode_steps=300,
kwargs={"obs_type": "state", "randomize_goal": True},
)
88 changes: 74 additions & 14 deletions gym_pusht/envs/pusht.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ class PushTEnv(gym.Env):

If `obs_type` is set to `state`, the observation space is a 5-dimensional vector representing the state of the
environment: [agent_x, agent_y, block_x, block_y, block_angle]. The values are in the range [0, 512] for the agent
and block positions and [0, 2*pi] for the block angle.
and block positions and [0, 2*pi] for the block angle. When `randomize_goal=True`, the observation is extended by [goal_x, goal_y, goal_theta] to become 8-dimensional.

If `obs_type` is set to `environment_state_agent_pos` the observation space is a dictionary with:
- `environment_state`: 16-dimensional vector representing the keypoint locations of the T (in [x0, y0, x1, y1, ...]
format). The values are in the range [0, 512]. See `get_keypoints` for a diagram showing the location of the
keypoint indices.
- `agent_pos`: A 2-dimensional vector representing the position of the robot end-effector.
- `goal_state`: When `randomize_goal=True`, includes a 16-dimensional vector representing the keypoint locations
of the goal T shape in the same format as `environment_state`.

If `obs_type` is set to `pixels`, the observation space is a 96x96 RGB image of the environment.

Expand Down Expand Up @@ -105,6 +107,8 @@ class PushTEnv(gym.Env):

* `visualization_height`: (int) The height of the visualized image. Default is `680`.

* `randomize_goal`: (bool) Whether to randomize the goal position. Default is `False`.

## Reset Arguments

Passing the option `options["reset_to_state"]` will reset the environment to a specific state.
Expand Down Expand Up @@ -144,11 +148,15 @@ def __init__(
observation_height=96,
visualization_width=680,
visualization_height=680,
randomize_goal=False,
):
super().__init__()
# Observations
self.obs_type = obs_type

# Goal Randomization
self.randomize_goal = randomize_goal

# Rendering
self.render_mode = render_mode
self.observation_width = observation_width
Expand All @@ -167,6 +175,11 @@ def __init__(
self.block_cog = block_cog
self.damping = damping

# Safe margins from walls for positioning objects
self.margin = 140 # Margin from walls to avoid spawning too close to edges
self.min_pos = np.array([self.margin, self.margin])
self.max_pos = np.array([512 - self.margin, 512 - self.margin])

# If human-rendering is used, `self.window` will be a reference
# to the window that we draw to. `self.clock` will be a clock that is used
# to ensure that the environment is rendered at the correct framerate in
Expand All @@ -182,12 +195,20 @@ def __init__(

def _initialize_observation_space(self):
if self.obs_type == "state":
# [agent_x, agent_y, block_x, block_y, block_angle]
self.observation_space = spaces.Box(
low=np.array([0, 0, 0, 0, 0]),
high=np.array([512, 512, 512, 512, 2 * np.pi]),
dtype=np.float64,
)
if self.randomize_goal:
# [agent_x, agent_y, block_x, block_y, block_angle, goal_x, goal_y, goal_theta]
self.observation_space = spaces.Box(
low=np.array([0, 0, 0, 0, 0, 0, 0, 0]),
high=np.array([512, 512, 512, 512, 2 * np.pi, 512, 512, 2 * np.pi]),
dtype=np.float64,
)
else:
# [agent_x, agent_y, block_x, block_y, block_angle]
self.observation_space = spaces.Box(
low=np.array([0, 0, 0, 0, 0]),
high=np.array([512, 512, 512, 512, 2 * np.pi]),
dtype=np.float64,
)
elif self.obs_type == "environment_state_agent_pos":
self.observation_space = spaces.Dict(
{
Expand All @@ -203,6 +224,12 @@ def _initialize_observation_space(self):
),
},
)
if self.randomize_goal:
self.observation_space["goal_state"] = spaces.Box(
low=np.zeros(16),
high=np.full((16,), 512),
dtype=np.float64,
)
elif self.obs_type == "pixels":
self.observation_space = spaces.Box(
low=0, high=255, shape=(self.observation_height, self.observation_width, 3), dtype=np.uint8
Expand Down Expand Up @@ -269,18 +296,28 @@ def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._setup()

# Randomize goal if enabled
if self.randomize_goal:
# Randomize goal position and orientation
goal_x = self.np_random.uniform(self.min_pos[0], self.max_pos[0])
goal_y = self.np_random.uniform(self.min_pos[1], self.max_pos[1])
goal_theta = self.np_random.uniform(0, 2 * np.pi)
self.goal_pose = np.array([goal_x, goal_y, goal_theta])
self._update_goal_shapes()

# Handle state reset
if options is not None and options.get("reset_to_state") is not None:
state = np.array(options.get("reset_to_state"))
else:
# state = self.np_random.uniform(low=[50, 50, 100, 100, -np.pi], high=[450, 450, 400, 400, np.pi])
rs = np.random.RandomState(seed=seed)
state = np.array(
[
rs.randint(50, 450),
rs.randint(50, 450),
rs.randint(100, 400),
rs.randint(100, 400),
rs.randn() * 2 * np.pi - np.pi,
self.np_random.uniform(self.min_pos[0], self.max_pos[0]), # agent_x
self.np_random.uniform(self.min_pos[1], self.max_pos[1]), # agent_y
self.np_random.uniform(self.min_pos[0], self.max_pos[0]), # block_x
self.np_random.uniform(self.min_pos[1], self.max_pos[1]), # block_y
self.np_random.uniform(0, 2 * np.pi), # block_angle
],
# dtype=np.float64
)
Expand Down Expand Up @@ -385,14 +422,26 @@ def get_obs(self):
agent_position = np.array(self.agent.position)
block_position = np.array(self.block.position)
block_angle = self.block.angle % (2 * np.pi)
return np.concatenate([agent_position, block_position, [block_angle]], dtype=np.float64)
goal_position = self.goal_pose[:2]
goal_angle = self.goal_pose[2]
obs = np.concatenate([agent_position, block_position, [block_angle]], dtype=np.float64)

if self.randomize_goal:
obs = np.concatenate([obs, goal_position, [goal_angle]], dtype=np.float64)

return obs

if self.obs_type == "environment_state_agent_pos":
return {
obs = {
"environment_state": self.get_keypoints(self._block_shapes).flatten(),
"agent_pos": np.array(self.agent.position),
}

if self.randomize_goal:
obs["goal_state"] = self.get_keypoints(self._goal_shapes).flatten()

return obs

pixels = self._render()
if self.obs_type == "pixels":
return pixels
Expand Down Expand Up @@ -446,7 +495,9 @@ def _setup(self):
# Add agent, block, and goal zone
self.agent = self.add_circle(self.space, (256, 400), 15)
self.block, self._block_shapes = self.add_tee(self.space, (256, 300), 0)
# Default goal pose that will be used if randomization is disabled
self.goal_pose = np.array([256, 256, np.pi / 4]) # x, y, theta (in radians)
self._update_goal_shapes()
if self.block_cog is not None:
self.block.center_of_gravity = self.block_cog

Expand All @@ -466,6 +517,15 @@ def _set_state(self, state):
# Run physics to take effect
self.space.step(self.dt)

def _update_goal_shapes(self):
goal_body = self.get_goal_pose_body(self.goal_pose)
self._goal_shapes = []
for shape in self.block.shapes:
verts = shape.get_vertices()
new_shape = pymunk.Poly(goal_body, verts)
self._goal_shapes.append(new_shape)
self._goal_body = goal_body

@staticmethod
def add_segment(space, a, b, radius):
# TODO(rcadene): rename add_segment to make_segment, since it is not added to the space
Expand Down