Skip to content
8 changes: 7 additions & 1 deletion examples/droid/README_train.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ You can download the DROID dataset with the following command (after installing
gsutil -m cp -r gs://gresearch/robotics/droid/1.0.1 <your_download_path>/droid/1.0.1
```

Note that downloading version 1.0.1 is important (not v1.0.0): it contains the complete set of language annotations (~75k episodes) while v1.0.0 only has annotations for 30k episodes.
Note that downloading version 1.0.1 is important (not v1.0.0): it contains the complete set of language annotations (~75k episodes) while v1.0.0 only has annotations for 30k episodes. If for some reason you would like to use another version, modify the line `version="1.0.1"` in the `DroidRldsDataset` object [here](src/openpi/training/droid_rlds_dataset.py).

You will need 1.8TB of disk storage to download the DROID RLDS dataset.

Expand All @@ -38,6 +38,12 @@ Run training:
uv run --group rlds scripts/train.py pi0_fast_droid_finetune --exp-name=my_experiment --overwrite
```

By default, training uses no filtering. Alternatively, you can use a custom filtering scheme by providing a json that maps from episode keys to a list of time step ranges (denoted as a tuple of start and end time step indicies) in that episode you wish to keep. The episode key is a unique ID defined as `f"{recording_folderpath}--{file_path}"`. We choose this convention because both paths are easily accessible in the DROID RLDS episodes' metadata.

We provide an example of such a filtering scheme in [filtering/compute_droid_nonidle_ranges.py](examples/droid/filtering/compute_droid_nonidle_ranges.py), which is significantly more aggressive than the default (and thus leads to policies that take significantly fewer idle actions). We recommend using the filter produced by this script, and have also provided a copy of the filter [here](https://huggingface.co/KarlP/droid#filtering-data) specifically for `droid/1.0.1`.

The filter json you wish to use can be specified by modifying the line `filter_dict_path="<path_to_filter_dict>"` in [src/openpi/training/config.py](src/openpi/training/config.py).

**Note**: The original pi0-FAST-DROID model was trained with joint velocity actions.
Joint velocity actions are not compatible with simulated evaluation environments (much harder to simulate).
Thus, we do not recommend training with joint velocity actions and instead use joint position actions here.
Expand Down
102 changes: 102 additions & 0 deletions examples/droid/compute_droid_nonidle_ranges.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""
Iterates through the DROID dataset and a json mapping from episode unique IDs to ranges of time steps
that should not be filtered out (all others are).

Specifically, we look for ranges of consecutive steps that contain at most min_idle_len consecutive idle frames
(default to 7 -- as most DROID action-chunking policies run the first 8 actions generated in each chunk, filtering
this way means the policy will not get stuck outputting stationary actions). Additionally, we also only keep non-idle
ranges of length at least min_non_idle_len (default to 16 frames = ~1 second), while also removing the last
filter_last_n_in_ranges frames from the end of each range (as those all correspond to action chunks with many idle actions).

This leaves us with trajectory segments consisting of contiguous, significant movement. Training on this filtered set
yields policies that output fewer stationary actions (i.e., get "stuck" in states less).
"""

import json
import os
from pathlib import Path

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
from tqdm import tqdm

os.environ["CUDA_VISIBLE_DEVICES"] = "" # Set to the GPU you want to use, or leave empty for CPU

builder = tfds.builder_from_directory(
# path to the `droid` directory (not its parent)
builder_dir="<path_to_droid_dataset_tfds_files>",
)
ds = builder.as_dataset(split="train", shuffle_files=False)
tf.data.experimental.ignore_errors(ds)

keep_ranges_path = "<path_to_where_to_save_the_json>"

min_idle_len = 7 # If more than this number of consecutive idle frames, filter all of them out
min_non_idle_len = 16 # If fewer than this number of consecutive non-idle frames, filter all of them out
filter_last_n_in_ranges = 10 # When using a filter dict, remove this many frames from the end of each range

keep_ranges_map = {}
if Path(keep_ranges_path).exists():
with Path(keep_ranges_path).open("r") as f:
keep_ranges_map = json.load(f)
print(f"Resuming from {len(keep_ranges_map)} episodes already processed")

for ep_idx, ep in enumerate(tqdm(ds)):
recording_folderpath = ep["episode_metadata"]["recording_folderpath"].numpy().decode()
file_path = ep["episode_metadata"]["file_path"].numpy().decode()

key = f"{recording_folderpath}--{file_path}"
if key in keep_ranges_map:
continue

joint_velocities = [step["action_dict"]["joint_velocity"].numpy() for step in ep["steps"]]
joint_velocities = np.array(joint_velocities)

is_idle_array = np.hstack(
[np.array([False]), np.all(np.abs(joint_velocities[1:] - joint_velocities[:-1]) < 1e-3, axis=1)]
)

# Find what steps go from idle to non-idle and vice-versa
is_idle_padded = np.concatenate(
[[False], is_idle_array, [False]]
) # Start and end with False, so idle at first step is a start of motion

is_idle_diff = np.diff(is_idle_padded.astype(int))
is_idle_true_starts = np.where(is_idle_diff == 1)[0] # +1 transitions --> going from idle to non-idle
is_idle_true_ends = np.where(is_idle_diff == -1)[0] # -1 transitions --> going from non-idle to idle

# Find which steps correspond to idle segments of length at least min_idle_len
true_segment_masks = (is_idle_true_ends - is_idle_true_starts) >= min_idle_len
is_idle_true_starts = is_idle_true_starts[true_segment_masks]
is_idle_true_ends = is_idle_true_ends[true_segment_masks]

keep_mask = np.ones(len(joint_velocities), dtype=bool)
for start, end in zip(is_idle_true_starts, is_idle_true_ends, strict=True):
keep_mask[start:end] = False

# Get all non-idle ranges of at least 16
# Same logic as above, but for keep_mask, allowing us to filter out contiguous ranges of length < min_non_idle_len
keep_padded = np.concatenate([[False], keep_mask, [False]])

keep_diff = np.diff(keep_padded.astype(int))
keep_true_starts = np.where(keep_diff == 1)[0] # +1 transitions --> going from filter out to keep
keep_true_ends = np.where(keep_diff == -1)[0] # -1 transitions --> going from keep to filter out

# Find which steps correspond to non-idle segments of length at least min_non_idle_len
true_segment_masks = (keep_true_ends - keep_true_starts) >= min_non_idle_len
keep_true_starts = keep_true_starts[true_segment_masks]
keep_true_ends = keep_true_ends[true_segment_masks]

# Add mapping from episode unique ID key to list of non-idle ranges to keep
keep_ranges_map[key] = []
for start, end in zip(keep_true_starts, keep_true_ends, strict=True):
keep_ranges_map[key].append((int(start), int(end) - filter_last_n_in_ranges))

if ep_idx % 1000 == 0:
with Path(keep_ranges_path).open("w") as f:
json.dump(keep_ranges_map, f)

print("Done!")
with Path(keep_ranges_path).open("w") as f:
json.dump(keep_ranges_map, f)
11 changes: 11 additions & 0 deletions src/openpi/training/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class DataConfig:
rlds_data_dir: str | None = None
# Action space for DROID dataset.
action_space: droid_rlds_dataset.DroidActionSpace | None = None
# Path to the filter dictionary file for DROID dataset
filter_dict_path: str | None = None


class GroupFactory(Protocol):
Expand Down Expand Up @@ -344,6 +346,12 @@ class RLDSDroidDataConfig(DataConfigFactory):
rlds_data_dir: str | None = None
action_space: droid_rlds_dataset.DroidActionSpace | None = None

# Filtering options. Can pass a path to a dictionary that maps episodes to timestep ranges
# to tuples denoting ranges of time steps to keep (start, end). Episodes are uniquely identified with
# f"{recording_folderpath}--{file_path}", both of which are present in the RLDS episode metadata.
# Path to the filter dictionary file.
filter_dict_path: str | None = None

@override
def create(self, assets_dirs: pathlib.Path, model_config: _model.BaseModelConfig) -> DataConfig:
repack_transform = _transforms.Group(
Expand Down Expand Up @@ -386,6 +394,7 @@ def create(self, assets_dirs: pathlib.Path, model_config: _model.BaseModelConfig
use_quantile_norm=model_config.model_type == ModelType.PI0_FAST,
rlds_data_dir=self.rlds_data_dir,
action_space=self.action_space,
filter_dict_path=self.filter_dict_path,
)


Expand Down Expand Up @@ -684,6 +693,8 @@ def __post_init__(self) -> None:
# Set this to the path to your DROID RLDS dataset (the parent directory of the `droid` directory).
rlds_data_dir="<path_to_droid_rlds_dataset>",
action_space=droid_rlds_dataset.DroidActionSpace.JOINT_POSITION,
# Set this to the path for whatever filtering json you wish to use (or None)
filter_dict_path="<path_to_filtering_json_or_None>",
),
weight_loader=weight_loaders.CheckpointWeightLoader("gs://openpi-assets/checkpoints/pi0_fast_base/params"),
lr_schedule=_optimizer.CosineDecaySchedule(
Expand Down
1 change: 1 addition & 0 deletions src/openpi/training/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def create_rlds_dataset(
shuffle=shuffle,
action_chunk_size=action_horizon,
action_space=data_config.action_space,
filter_dict_path=data_config.filter_dict_path,
)


Expand Down
73 changes: 61 additions & 12 deletions src/openpi/training/droid_rlds_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from enum import Enum
from enum import auto
from pathlib import Path


class DroidActionSpace(Enum):
Expand All @@ -31,6 +32,7 @@ def __init__(
shuffle_buffer_size: int = 250_000,
num_parallel_reads: int = -1, # -1 == tf.data.AUTOTUNE -- hack to not import tf at top level
num_parallel_calls: int = -1, # -1 == tf.data.AUTOTUNE -- hack to not import tf at top level
filter_dict_path=None,
):
# Import tensorflow here to not make it mandatory in case RLDS data loader is not used.
import dlimp as dl
Expand All @@ -40,7 +42,7 @@ def __init__(
# Configure Tensorflow with *no GPU devices* (to prevent clobber with PyTorch / JAX)
tf.config.set_visible_devices([], "GPU")

builder = tfds.builder("droid", data_dir=data_dir)
builder = tfds.builder("droid", data_dir=data_dir, version="1.0.1")
dataset = dl.DLataset.from_rlds(builder, split="train", shuffle=shuffle, num_parallel_reads=num_parallel_reads)

# Filter out any unsuccessful trajectories -- we use the file name to check this
Expand All @@ -53,6 +55,43 @@ def __init__(
# Repeat dataset so we never run out of data.
dataset = dataset.repeat()

# Load the filter dictionary if provided.
# The filter dictionary is a JSON file that maps episode keys to ranges of frames to keep
# (e.g.,
# {
# "keep_ranges": {
# "<episode key>": [[0, 100], [200, 300]]
# }
# }
# means keep frames 0-89 and 200-289).
if filter_dict_path is not None:
import json

from tqdm import tqdm

with Path(filter_dict_path).open("r") as f:
filter_dict = json.load(f)

print(f"Using filter dictionary with {len(filter_dict['keep_ranges'])} episodes")

keys_tensor = []
values_tensor = []

for episode_key, ranges in tqdm(filter_dict.items()):
for start, end in ranges:
for t in range(start, end):
frame_key = f"{episode_key}--{t}"
keys_tensor.append(frame_key)
values_tensor.append(True)
self.filter_table = tf.lookup.StaticHashTable(
tf.lookup.KeyValueTensorInitializer(keys_tensor, values_tensor), default_value=False
)
print("Filter hash table initialized")
else:
self.filter_table = tf.lookup.StaticHashTable(
tf.lookup.KeyValueTensorInitializer([""], [True]), default_value=True
)

def restructure(traj):
"""Reformat observation and action keys, sample language instruction."""
# Important: we use joint *position* action space -- easier to simulate!
Expand Down Expand Up @@ -80,6 +119,21 @@ def restructure(traj):
[traj["language_instruction"], traj["language_instruction_2"], traj["language_instruction_3"]]
)[0]

traj_len = tf.shape(traj["action"])[0]
indices = tf.as_string(tf.range(traj_len))

# Compute a uniquely-identifying step ID by concatenating the recording folderpath, file path,
# and each step's time step index. This will index into the filter hash table, and if it returns true,
# then the frame passes the filter.
step_id = (
traj["traj_metadata"]["episode_metadata"]["recording_folderpath"]
+ "--"
+ traj["traj_metadata"]["episode_metadata"]["file_path"]
+ "--"
+ indices
)
passes_filter = self.filter_table.lookup(step_id)

return {
"actions": actions,
"observation": {
Expand All @@ -89,6 +143,8 @@ def restructure(traj):
"gripper_position": traj["observation"]["gripper_position"],
},
"prompt": instruction,
"step_id": step_id,
"passes_filter": passes_filter,
}

dataset = dataset.traj_map(restructure, num_parallel_calls)
Expand Down Expand Up @@ -119,17 +175,10 @@ def chunk_actions(traj):
# Flatten: map from trajectory dataset to dataset of individual action chunks
dataset = dataset.flatten(num_parallel_calls=num_parallel_calls)

# Filter out frames where actions are idle. Must be done after flattening, as filter should apply per-frame.
def filter_idle(traj):
"""Filter out chunks with idle actions.
--> we filter if at least first half of chunk does not move.
"""
if action_space == DroidActionSpace.JOINT_POSITION:
# Compute delta to first position in action chunk
return tf.reduce_any(tf.abs(traj["actions"][: action_chunk_size // 2] - traj["actions"][:1]) > 1e-3)
return tf.reduce_any(tf.abs(traj["actions"][: action_chunk_size // 2]) > 1e-3)

dataset = dataset.filter(filter_idle)
def filter_from_dict(frame):
return frame["passes_filter"]

dataset = dataset.filter(filter_from_dict)

# Decode images: RLDS saves encoded images, only decode now for efficiency
def decode_images(traj):
Expand Down