-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathrobot_utils.py
More file actions
209 lines (168 loc) · 8.21 KB
/
robot_utils.py
File metadata and controls
209 lines (168 loc) · 8.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import os
from libero.libero.envs import OffScreenRenderEnv
from libero.libero import benchmark, get_libero_path
import tensorflow as tf
import numpy as np
import math
import time
import imageio
DATE = time.strftime("%Y_%m_%d")
DATE_TIME = time.strftime("%Y_%m_%d-%H_%M_%S")
class GenerateConfig:
#################################################################################################################
# Model-specific parameters
#################################################################################################################
model_family: str = "openvla" # Model family
pretrained_checkpoint = "" # Pretrained checkpoint path
load_in_8bit: bool = False # (For OpenVLA only) Load with 8-bit quantization
load_in_4bit: bool = False # (For OpenVLA only) Load with 4-bit quantization
center_crop: bool = True # Center crop? (if trained w/ random crop image aug)
#################################################################################################################
# LIBERO environment-specific parameters
#################################################################################################################
task_suite_name: str = "libero_object" # Task suite. Options: libero_spatial, libero_object, libero_goal, libero_10, libero_90
num_steps_wait: int = 10 # Number of steps to wait for objects to stabilize in sim
num_trials_per_task: int = 50 # Number of rollouts per task
seed: int = 7 # Random Seed (for reproducibility)
def get_libero_env(task, model_family, resolution=256):
"""Initializes and returns the LIBERO environment, along with the task description."""
task_description = task.language
task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file)
env_args = {"bddl_file_name": task_bddl_file, "camera_heights": resolution, "camera_widths": resolution}
env = OffScreenRenderEnv(**env_args)
env.seed(0)
return env, task_description
def get_image_resize_size(cfg):
"""
Gets image resize size for a model class.
If `resize_size` is an int, then the resized image will be a square.
Else, the image will be a rectangle.
"""
if cfg.model_family == "openvla":
resize_size = 1024
else:
raise ValueError("Unexpected `model_family` found in config.")
return resize_size
def get_task(cfg, task_id: int):
benchmark_dict = benchmark.get_benchmark_dict()
task_suite = benchmark_dict[cfg.task_suite_name]()
task = task_suite.get_task(task_id)
return task, task_suite
def get_libero_dummy_action(model_family: str):
"""Get dummy/no-op action, used to roll out the simulation while the robot does nothing."""
return [0, 0, 0, 0, 0, 0, -1]
def resize_image(img, resize_size):
"""
Takes numpy array corresponding to a single image and returns resized image as numpy array.
NOTE (Moo Jin): To make input images in distribution with respect to the inputs seen at training time, we follow
the same resizing scheme used in the Octo dataloader, which OpenVLA uses for training.
"""
assert isinstance(resize_size, tuple)
# Resize to image size expected by model
img = tf.image.encode_jpeg(img) # Encode as JPEG, as done in RLDS dataset builder
img = tf.io.decode_image(img, expand_animations=False, dtype=tf.uint8) # Immediately decode back
img = tf.image.resize(img, resize_size, method="lanczos3", antialias=True)
img = tf.cast(tf.clip_by_value(tf.round(img), 0, 255), tf.uint8)
img = img.numpy()
return img
def get_libero_image(obs, resize_size):
"""Extracts image from observations and preprocesses it."""
assert isinstance(resize_size, int) or isinstance(resize_size, tuple)
if isinstance(resize_size, int):
resize_size = (resize_size, resize_size)
img = obs["agentview_image"]
img = img[::-1, ::-1] # IMPORTANT: rotate 180 degrees to match train preprocessing
img = resize_image(img, resize_size)
return img
def normalize_gripper_action(action, binarize=True):
"""
Changes gripper action (last dimension of action vector) from [0,1] to [-1,+1].
Necessary for some environments (not Bridge) because the dataset wrapper standardizes gripper actions to [0,1].
Note that unlike the other action dimensions, the gripper action is not normalized to [-1,+1] by default by
the dataset wrapper.
Normalization formula: y = 2 * (x - orig_low) / (orig_high - orig_low) - 1
"""
# Just normalize the last action to [-1,+1].
orig_low, orig_high = 0.0, 1.0
action[..., -1] = 2 * (action[..., -1] - orig_low) / (orig_high - orig_low) - 1
if binarize:
# Binarize to -1 or +1.
action[..., -1] = np.sign(action[..., -1])
return action
def invert_gripper_action(action):
"""
Flips the sign of the gripper action (last dimension of action vector).
This is necessary for some environments where -1 = open, +1 = close, since
the RLDS dataloader aligns gripper actions such that 0 = close, 1 = open.
"""
action[..., -1] = action[..., -1] * -1.0
return action
def save_rollout_video(rollout_images, idx, success, task_description, output_dir):
"""Saves an MP4 replay of an episode."""
video_dir = f"{output_dir}"
os.makedirs(video_dir, exist_ok=True)
processed_task_description = task_description.lower().replace(" ", "_").replace("\n", "_").replace(".", "_")[:50]
mp4_path = f"{video_dir}/episode={idx}--prompt={processed_task_description}.mp4"
video_writer = imageio.get_writer(mp4_path, fps=30)
for img in rollout_images:
video_writer.append_data(img)
video_writer.close()
return mp4_path
def quat2axisangle(quat):
"""
Copied from robosuite: https://github.com/ARISE-Initiative/robosuite/blob/eafb81f54ffc104f905ee48a16bb15f059176ad3/robosuite/utils/transform_utils.py#L490C1-L512C55
Converts quaternion to axis-angle format.
Returns a unit vector direction scaled by its angle in radians.
Args:
quat (np.array): (x,y,z,w) vec4 float angles
Returns:
np.array: (ax,ay,az) axis-angle exponential coordinates
"""
# clip quaternion
if quat[3] > 1.0:
quat[3] = 1.0
elif quat[3] < -1.0:
quat[3] = -1.0
den = np.sqrt(1.0 - quat[3] * quat[3])
if math.isclose(den, 0.0):
# This is (close to) a zero degree rotation, immediately return
return np.zeros(3)
return (quat[:3] * 2.0 * math.acos(quat[3])) / den
def crop_and_resize(image, crop_scale, batch_size):
"""
Center-crops an image to have area `crop_scale` * (original image area), and then resizes back
to original size. We use the same logic seen in the `dlimp` RLDS datasets wrapper to avoid
distribution shift at test time.
Args:
image: TF Tensor of shape (batch_size, H, W, C) or (H, W, C) and datatype tf.float32 with
values between [0,1].
crop_scale: The area of the center crop with respect to the original image.
batch_size: Batch size.
"""
# Convert from 3D Tensor (H, W, C) to 4D Tensor (batch_size, H, W, C)
assert image.shape.ndims == 3 or image.shape.ndims == 4
expanded_dims = False
if image.shape.ndims == 3:
image = tf.expand_dims(image, axis=0)
expanded_dims = True
# Get height and width of crop
new_heights = tf.reshape(tf.clip_by_value(tf.sqrt(crop_scale), 0, 1), shape=(batch_size,))
new_widths = tf.reshape(tf.clip_by_value(tf.sqrt(crop_scale), 0, 1), shape=(batch_size,))
# Get bounding box representing crop
height_offsets = (1 - new_heights) / 2
width_offsets = (1 - new_widths) / 2
bounding_boxes = tf.stack(
[
height_offsets,
width_offsets,
height_offsets + new_heights,
width_offsets + new_widths,
],
axis=1,
)
# Crop and then resize back up
image = tf.image.crop_and_resize(image, bounding_boxes, tf.range(batch_size), (224, 224))
# Convert back to 3D Tensor (H, W, C)
if expanded_dims:
image = image[0]
return image