diff --git a/pyproject.toml b/pyproject.toml index e58f42a373..5259951f6f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -323,9 +323,9 @@ ignore_errors = false # module = "lerobot.processor.*" # ignore_errors = false -# [[tool.mypy.overrides]] -# module = "lerobot.datasets.*" -# ignore_errors = false +[[tool.mypy.overrides]] +module = "lerobot.datasets.*" +ignore_errors = false [[tool.mypy.overrides]] module = "lerobot.cameras.*" diff --git a/src/lerobot/datasets/aggregate.py b/src/lerobot/datasets/aggregate.py index 870c9571e8..0bbe1d30a8 100644 --- a/src/lerobot/datasets/aggregate.py +++ b/src/lerobot/datasets/aggregate.py @@ -171,8 +171,8 @@ def aggregate_datasets( aggr_repo_id: str, roots: list[Path] | None = None, aggr_root: Path | None = None, - data_files_size_in_mb: float | None = None, - video_files_size_in_mb: float | None = None, + data_files_size_in_mb: int | None = None, + video_files_size_in_mb: int | None = None, chunk_size: int | None = None, ): """Aggregates multiple LeRobot datasets into a single unified dataset. @@ -450,7 +450,7 @@ def append_or_create_parquet_file( chunk_size: int, default_path: str, contains_images: bool = False, - aggr_root: Path = None, + aggr_root: Path = Path.cwd(), ): """Appends data to an existing parquet file or creates a new one based on size constraints. @@ -465,7 +465,7 @@ def append_or_create_parquet_file( chunk_size: Maximum number of files per chunk before incrementing chunk index. default_path: Format string for generating file paths. contains_images: Whether the data contains images requiring special handling. - aggr_root: Root path for the aggregated dataset. + aggr_root: Root path for the aggregated dataset. Defaults to Path.cwd() Returns: dict: Updated index dictionary with current chunk and file indices. diff --git a/src/lerobot/datasets/compute_stats.py b/src/lerobot/datasets/compute_stats.py index 61e174d5ca..3c26821024 100644 --- a/src/lerobot/datasets/compute_stats.py +++ b/src/lerobot/datasets/compute_stats.py @@ -32,17 +32,15 @@ class RunningQuantileStats: def __init__(self, quantile_list: list[float] | None = None, num_quantile_bins: int = 5000): self._count = 0 - self._mean = None + self._mean: np.ndarray | None = None self._mean_of_squares = None - self._min = None - self._max = None - self._histograms = None - self._bin_edges = None + self._min: np.ndarray | None = None + self._max: np.ndarray | None = None + self._histograms: list[np.ndarray] | None = None + self._bin_edges: list[np.ndarray] | None = None self._num_quantile_bins = num_quantile_bins - self._quantile_list = quantile_list - if self._quantile_list is None: - self._quantile_list = DEFAULT_QUANTILES + self._quantile_list: list[float] = quantile_list or DEFAULT_QUANTILES self._quantile_keys = [f"q{int(q * 100):02d}" for q in self._quantile_list] def update(self, batch: np.ndarray) -> None: @@ -65,6 +63,10 @@ def update(self, batch: np.ndarray) -> None: for i in range(vector_length) ] else: + assert self._mean is not None + assert self._min is not None + assert self._max is not None + if vector_length != self._mean.size: raise ValueError("The length of new vectors does not match the initialized vector length.") @@ -103,6 +105,10 @@ def get_statistics(self) -> dict[str, np.ndarray]: if self._count < 2: raise ValueError("Cannot compute statistics for less than 2 vectors.") + assert self._mean is not None + assert self._min is not None + assert self._max is not None + variance = self._mean_of_squares - self._mean**2 stddev = np.sqrt(np.maximum(0, variance)) @@ -150,12 +156,19 @@ def _adjust_histograms(self): def _update_histograms(self, batch: np.ndarray) -> None: """Update histograms with new vectors.""" + + assert self._histograms is not None + assert self._bin_edges is not None + for i in range(batch.shape[1]): hist, _ = np.histogram(batch[:, i], bins=self._bin_edges[i]) self._histograms[i] += hist def _compute_quantiles(self) -> list[np.ndarray]: """Compute quantiles based on histograms.""" + assert self._histograms is not None + assert self._bin_edges is not None + results = [] for q in self._quantile_list: target_count = q * self._count @@ -174,9 +187,9 @@ def _compute_single_quantile(self, hist: np.ndarray, edges: np.ndarray, target_c idx = np.searchsorted(cumsum, target_count) if idx == 0: - return edges[0] + return float(edges[0]) if idx >= len(cumsum): - return edges[-1] + return float(edges[-1]) # If not edge case, interpolate within the bin count_before = cumsum[idx - 1] @@ -242,6 +255,7 @@ def sample_images(image_paths: list[str]) -> np.ndarray: images[i] = img + assert images is not None return images @@ -318,7 +332,7 @@ def _reshape_for_feature_stats(value: np.ndarray, keepdims: bool) -> np.ndarray: def _reshape_for_global_stats( value: np.ndarray, keepdims: bool, original_shape: tuple[int, ...] -) -> np.ndarray | float: +) -> np.ndarray: """Reshape statistics for global reduction (axis=None).""" if keepdims: target_shape = tuple(1 for _ in original_shape) @@ -329,7 +343,7 @@ def _reshape_for_global_stats( def _reshape_single_stat( value: np.ndarray, axis: int | tuple[int, ...] | None, keepdims: bool, original_shape: tuple[int, ...] -) -> np.ndarray | float: +) -> np.ndarray: """Apply appropriate reshaping to a single statistic array. This function transforms statistic arrays to match expected output shapes @@ -508,11 +522,14 @@ def compute_episode_stats( if features[key]["dtype"] == "string": continue + axes_to_reduce: int | tuple[int, ...] | None if features[key]["dtype"] in ["image", "video"]: + assert isinstance(data, list) ep_ft_array = sample_images(data) axes_to_reduce = (0, 2, 3) keepdims = True else: + assert isinstance(data, np.ndarray) ep_ft_array = data axes_to_reduce = 0 keepdims = data.ndim == 1 @@ -562,7 +579,7 @@ def _assert_type_and_shape(stats_list: list[dict[str, dict]]): _validate_stat_value(stat_value, stat_key, feature_key) -def aggregate_feature_stats(stats_ft_list: list[dict[str, dict]]) -> dict[str, dict[str, np.ndarray]]: +def aggregate_feature_stats(stats_ft_list: list[dict[str, float]]) -> dict[str, np.ndarray]: """Aggregates stats for a single feature.""" means = np.stack([s["mean"] for s in stats_ft_list]) variances = np.stack([s["std"] ** 2 for s in stats_ft_list]) @@ -617,7 +634,7 @@ def aggregate_stats(stats_list: list[dict[str, dict]]) -> dict[str, dict[str, np _assert_type_and_shape(stats_list) data_keys = {key for stats in stats_list for key in stats} - aggregated_stats = {key: {} for key in data_keys} + aggregated_stats: dict[str, dict[str, np.ndarray]] = {key: {} for key in data_keys} for key in data_keys: stats_with_key = [stats[key] for stats in stats_list if key in stats] diff --git a/src/lerobot/datasets/dataset_tools.py b/src/lerobot/datasets/dataset_tools.py index 2735ba0a05..691e363f4c 100644 --- a/src/lerobot/datasets/dataset_tools.py +++ b/src/lerobot/datasets/dataset_tools.py @@ -25,8 +25,9 @@ import logging import shutil -from collections.abc import Callable +from collections.abc import Callable, Mapping from pathlib import Path +from typing import Any, cast import datasets import numpy as np @@ -140,7 +141,7 @@ def delete_episodes( def split_dataset( dataset: LeRobotDataset, - splits: dict[str, float | list[int]], + splits: Mapping[str, float | list[int]], output_dir: str | Path | None = None, ) -> dict[str, LeRobotDataset]: """Split a LeRobotDataset into multiple smaller datasets. @@ -164,12 +165,13 @@ def split_dataset( raise ValueError("No splits provided") if all(isinstance(v, float) for v in splits.values()): - splits = _fractions_to_episode_indices(dataset.meta.total_episodes, splits) + splits = _fractions_to_episode_indices(dataset.meta.total_episodes, cast(dict[str, float], splits)) - all_episodes = set() + all_episodes: set[int] = set() for split_name, episodes in splits.items(): if not episodes: raise ValueError(f"Split '{split_name}' has no episodes") + assert not isinstance(episodes, float) episode_set = set(episodes) if episode_set & all_episodes: raise ValueError("Episodes cannot appear in multiple splits") @@ -186,6 +188,7 @@ def split_dataset( result_datasets = {} for split_name, episodes in splits.items(): + assert not isinstance(episodes, float) logging.info(f"Creating split '{split_name}' with {len(episodes)} episodes") split_repo_id = f"{dataset.repo_id}_{split_name}" @@ -441,8 +444,8 @@ def remove_feature( def _fractions_to_episode_indices( total_episodes: int, - splits: dict[str, float], -) -> dict[str, list[int]]: + splits: Mapping[str, float], +) -> Mapping[str, list[int]]: """Convert split fractions to episode indices.""" if sum(splits.values()) > 1.0: raise ValueError("Split fractions must sum to <= 1.0") @@ -840,7 +843,7 @@ def _copy_and_reindex_episodes_metadata( # array([array([array([0.])]), array([array([0.])]), array([array([0.])])]) # This happens particularly with image/video statistics. We need to detect and flatten # these nested structures back to proper (3, 1, 1) arrays so aggregate_stats can process them. - episode_stats = {} + episode_stats: dict[str, Any] = {} for key in src_episode_full: if key.startswith("stats/"): stat_key = key.replace("stats/", "") diff --git a/src/lerobot/datasets/factory.py b/src/lerobot/datasets/factory.py index f3ceb2b0c4..3bc4f7a9b8 100644 --- a/src/lerobot/datasets/factory.py +++ b/src/lerobot/datasets/factory.py @@ -20,11 +20,7 @@ from lerobot.configs.policies import PreTrainedConfig from lerobot.configs.train import TrainPipelineConfig -from lerobot.datasets.lerobot_dataset import ( - LeRobotDataset, - LeRobotDatasetMetadata, - MultiLeRobotDataset, -) +from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata, MultiLeRobotDataset from lerobot.datasets.streaming_dataset import StreamingLeRobotDataset from lerobot.datasets.transforms import ImageTransforms from lerobot.utils.constants import ACTION, OBS_PREFIX, REWARD @@ -62,10 +58,7 @@ def resolve_delta_timestamps( if key.startswith(OBS_PREFIX) and cfg.observation_delta_indices is not None: delta_timestamps[key] = [i / ds_meta.fps for i in cfg.observation_delta_indices] - if len(delta_timestamps) == 0: - delta_timestamps = None - - return delta_timestamps + return delta_timestamps if len(delta_timestamps) > 0 else None def make_dataset(cfg: TrainPipelineConfig) -> LeRobotDataset | MultiLeRobotDataset: @@ -88,6 +81,7 @@ def make_dataset(cfg: TrainPipelineConfig) -> LeRobotDataset | MultiLeRobotDatas ds_meta = LeRobotDatasetMetadata( cfg.dataset.repo_id, root=cfg.dataset.root, revision=cfg.dataset.revision ) + assert cfg.policy is not None delta_timestamps = resolve_delta_timestamps(cfg.policy, ds_meta) if not cfg.dataset.streaming: dataset = LeRobotDataset( diff --git a/src/lerobot/datasets/image_writer.py b/src/lerobot/datasets/image_writer.py index ee10df6e19..3171b985a3 100644 --- a/src/lerobot/datasets/image_writer.py +++ b/src/lerobot/datasets/image_writer.py @@ -144,7 +144,7 @@ class AsyncImageWriter: def __init__(self, num_processes: int = 0, num_threads: int = 1): self.num_processes = num_processes self.num_threads = num_threads - self.queue = None + self.queue: queue.Queue | multiprocessing.JoinableQueue | None = None self.threads = [] self.processes = [] self._stopped = False @@ -170,6 +170,7 @@ def __init__(self, num_processes: int = 0, num_threads: int = 1): self.processes.append(p) def save_image(self, image: torch.Tensor | np.ndarray | PIL.Image.Image, fpath: Path): + assert self.queue is not None if isinstance(image, torch.Tensor): # Convert tensor to numpy array to minimize main process time image = image.cpu().numpy() diff --git a/src/lerobot/datasets/lerobot_dataset.py b/src/lerobot/datasets/lerobot_dataset.py index c8bc5049ec..bbd3da5f4d 100644 --- a/src/lerobot/datasets/lerobot_dataset.py +++ b/src/lerobot/datasets/lerobot_dataset.py @@ -88,11 +88,11 @@ def __init__( force_cache_sync: bool = False, metadata_buffer_size: int = 10, ): - self.repo_id = repo_id - self.revision = revision if revision else CODEBASE_VERSION + self.repo_id: str = repo_id + self.revision: str | None = revision if revision else CODEBASE_VERSION self.root = Path(root) if root is not None else HF_LEROBOT_HOME / repo_id - self.writer = None - self.latest_episode = None + self.writer: pq.ParquetWriter | None = None + self.latest_episode: dict | None = None self.metadata_buffer: list[dict] = [] self.metadata_buffer_size = metadata_buffer_size @@ -113,7 +113,7 @@ def _flush_metadata_buffer(self) -> None: if not hasattr(self, "metadata_buffer") or len(self.metadata_buffer) == 0: return - combined_dict = {} + combined_dict: dict[str, list] = {} for episode_dict in self.metadata_buffer: for key, value in episode_dict.items(): if key not in combined_dict: @@ -209,6 +209,7 @@ def get_video_file_path(self, ep_index: int, vid_key: str) -> Path: ep = self.episodes[ep_index] chunk_idx = ep[f"videos/{vid_key}/chunk_index"] file_idx = ep[f"videos/{vid_key}/file_index"] + assert self.video_path is not None fpath = self.video_path.format(video_key=vid_key, chunk_index=chunk_idx, file_index=file_idx) return Path(fpath) @@ -430,6 +431,7 @@ def update_video_info(self, video_key: str | None = None) -> None: video_keys = [video_key] if video_key is not None else self.video_keys for key in video_keys: if not self.features[key].get("info", None): + assert self.video_path is not None video_path = self.root / self.video_path.format(video_key=key, chunk_index=0, file_index=0) self.info["features"][key]["info"] = get_video_info(video_path) @@ -673,18 +675,20 @@ def __init__( self.delta_timestamps = delta_timestamps self.episodes = episodes self.tolerance_s = tolerance_s - self.revision = revision if revision else CODEBASE_VERSION + self.revision: str | None = revision if revision else CODEBASE_VERSION self.video_backend = video_backend if video_backend else get_safe_default_codec() self.delta_indices = None self.batch_encoding_size = batch_encoding_size self.episodes_since_last_encoding = 0 # Unused attributes - self.image_writer = None - self.episode_buffer = None - self.writer = None - self.latest_episode = None - self._current_file_start_frame = None # Track the starting frame index of the current parquet file + self.image_writer: AsyncImageWriter | None = None + self.episode_buffer: dict | None = None + self.writer: pq.ParquetWriter | None = None + self.latest_episode: dict | None = None + self._current_file_start_frame: int | None = ( + None # Track the starting frame index of the current parquet file + ) self.root.mkdir(exist_ok=True, parents=True) @@ -813,7 +817,7 @@ def download(self, download_videos: bool = True) -> None: files = self.get_episodes_file_paths() self.pull_from_repo(allow_patterns=files, ignore_patterns=ignore_patterns) - def get_episodes_file_paths(self) -> list[Path]: + def get_episodes_file_paths(self) -> list[str]: episodes = self.episodes if self.episodes is not None else list(range(self.meta.total_episodes)) fpaths = [str(self.meta.get_data_file_path(ep_idx)) for ep_idx in episodes] if len(self.meta.video_keys) > 0: @@ -869,7 +873,7 @@ def _check_cached_episodes_sufficient(self) -> bool: def create_hf_dataset(self) -> datasets.Dataset: features = get_hf_features_from_features(self.features) - ft_dict = {col: [] for col in features} + ft_dict: dict = {col: [] for col in features} hf_dataset = datasets.Dataset.from_dict(ft_dict, features=features, split="train") hf_dataset.set_transform(hf_transform_to_torch) return hf_dataset @@ -908,7 +912,10 @@ def hf_features(self) -> datasets.Features: else: return get_hf_features_from_features(self.features) - def _get_query_indices(self, idx: int, ep_idx: int) -> tuple[dict[str, list[int | bool]]]: + def _get_query_indices( + self, idx: int, ep_idx: int + ) -> tuple[dict[str, list[int]], dict[str, torch.BoolTensor]]: + assert self.delta_indices is not None ep = self.meta.episodes[ep_idx] ep_start = ep["dataset_from_index"] ep_end = ep["dataset_to_index"] @@ -1031,7 +1038,7 @@ def finalize(self): def create_episode_buffer(self, episode_index: int | None = None) -> dict: current_ep_idx = self.meta.total_episodes if episode_index is None else episode_index - ep_buffer = {} + ep_buffer: dict[str, int | list] = {} # size and task are special cases that are not in self.features ep_buffer["size"] = 0 ep_buffer["task"] = [] @@ -1072,6 +1079,8 @@ def add_frame(self, frame: dict) -> None: if self.episode_buffer is None: self.episode_buffer = self.create_episode_buffer() + assert self.episode_buffer is not None + # Automatically add frame_index and timestamp to episode buffer frame_index = self.episode_buffer["size"] timestamp = frame.pop("timestamp") if "timestamp" in frame else frame_index / self.fps @@ -1113,6 +1122,7 @@ def save_episode(self, episode_data: dict | None = None) -> None: None. """ episode_buffer = episode_data if episode_data is not None else self.episode_buffer + assert episode_buffer is not None validate_episode_buffer(episode_buffer, self.meta.total_episodes, self.features) @@ -1293,6 +1303,7 @@ def _save_episode_data(self, episode_buffer: dict) -> dict: self.writer = pq.ParquetWriter( path, schema=table.schema, compression="snappy", use_dictionary=True ) + assert self.writer is not None self.writer.write_table(table) metadata = { @@ -1319,6 +1330,8 @@ def _save_episode_video(self, video_key: str, episode_index: int) -> dict: ep_size_in_mb = get_file_size_in_mb(ep_path) ep_duration_in_s = get_video_duration_in_s(ep_path) + assert self.meta.video_path is not None + if ( episode_index == 0 or self.meta.latest_episode is None @@ -1345,7 +1358,6 @@ def _save_episode_video(self, video_key: str, episode_index: int) -> dict: latest_ep = self.meta.latest_episode chunk_idx = latest_ep[f"videos/{video_key}/chunk_index"][0] file_idx = latest_ep[f"videos/{video_key}/file_index"][0] - latest_path = self.root / self.meta.video_path.format( video_key=video_key, chunk_index=chunk_idx, file_index=file_idx ) @@ -1386,6 +1398,7 @@ def _save_episode_video(self, video_key: str, episode_index: int) -> dict: return metadata def clear_episode_buffer(self, delete_images: bool = True) -> None: + assert self.episode_buffer is not None # Clean up image files for the current episode buffer if delete_images: # Wait for the async image writer to finish diff --git a/src/lerobot/datasets/online_buffer.py b/src/lerobot/datasets/online_buffer.py index 563d800b92..498132d88a 100644 --- a/src/lerobot/datasets/online_buffer.py +++ b/src/lerobot/datasets/online_buffer.py @@ -73,8 +73,8 @@ class OnlineBuffer(torch.utils.data.Dataset): def __init__( self, write_dir: str | Path, - data_spec: dict[str, Any] | None, - buffer_capacity: int | None, + data_spec: dict[str, Any], + buffer_capacity: int, fps: float | None = None, delta_timestamps: dict[str, list[float]] | dict[str, np.ndarray] | None = None, ): @@ -98,16 +98,17 @@ def __init__( converted to dict[str, np.ndarray] for optimization purposes. """ + self._delta_timestamps: dict[str, np.ndarray] | None = None self.set_delta_timestamps(delta_timestamps) self._fps = fps # Tolerance in seconds used to discard loaded frames when their timestamps are not close enough from # the requested frames. It is only used when `delta_timestamps` is provided. # minus 1e-4 to account for possible numerical error - self.tolerance_s = 1 / self.fps - 1e-4 if fps is not None else None + self.tolerance_s = 1 / self.fps - 1e-4 if self.fps is not None else None self._buffer_capacity = buffer_capacity data_spec = self._make_data_spec(data_spec, buffer_capacity) Path(write_dir).mkdir(parents=True, exist_ok=True) - self._data = {} + self._data: dict[str, Any] = {} for k, v in data_spec.items(): self._data[k] = _make_memmap_safe( filename=Path(write_dir) / k, @@ -120,7 +121,7 @@ def __init__( def delta_timestamps(self) -> dict[str, np.ndarray] | None: return self._delta_timestamps - def set_delta_timestamps(self, value: dict[str, list[float]] | None): + def set_delta_timestamps(self, value: dict[str, list[float]] | dict[str, np.ndarray] | None): """Set delta_timestamps converting the values to numpy arrays. The conversion is for an optimization in the __getitem__. The loop is much slower if the arrays @@ -194,10 +195,10 @@ def add_data(self, data: dict[str, np.ndarray]): data[OnlineBuffer.INDEX_KEY] += last_data_index + 1 # Insert the new data starting from next_index. It may be necessary to wrap around to the start. - n_surplus = max(0, new_data_length - (self._buffer_capacity - next_index)) + n_surplus: int = max(0, new_data_length - (self._buffer_capacity - next_index)) for k in self.data_keys: if n_surplus == 0: - slc = slice(next_index, next_index + new_data_length) + slc: slice = slice(next_index, next_index + new_data_length) self._data[k][slc] = data[k] self._data[OnlineBuffer.OCCUPANCY_MASK_KEY][slc] = True else: @@ -335,7 +336,7 @@ def compute_sampler_weights( weights = [] if len(offline_dataset) > 0: - offline_data_mask_indices = [] + offline_data_mask_indices: list[int] = [] for start_index, end_index in zip( offline_dataset.meta.episodes["dataset_from_index"], offline_dataset.meta.episodes["dataset_to_index"], @@ -353,7 +354,7 @@ def compute_sampler_weights( ) if online_dataset is not None and len(online_dataset) > 0: - online_data_mask_indices = [] + online_data_mask_indices: list[int] = [] episode_indices = online_dataset.get_data_by_key("episode_index") for episode_idx in torch.unique(episode_indices): where_episode = torch.where(episode_indices == episode_idx) diff --git a/src/lerobot/datasets/pipeline_features.py b/src/lerobot/datasets/pipeline_features.py index 4fad7bd20b..0a9d9c039e 100644 --- a/src/lerobot/datasets/pipeline_features.py +++ b/src/lerobot/datasets/pipeline_features.py @@ -35,7 +35,10 @@ def create_initial_features( Returns: The initial features dictionary structured by PipelineFeatureType. """ - features = {PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: {}} + features: dict[PipelineFeatureType, dict[str, Any]] = { + PipelineFeatureType.ACTION: {}, + PipelineFeatureType.OBSERVATION: {}, + } if action: features[PipelineFeatureType.ACTION] = action if observation: @@ -44,13 +47,13 @@ def create_initial_features( # Helper to filter state/action keys based on regex patterns. -def should_keep(key: str, patterns: tuple[str]) -> bool: +def should_keep(key: str, patterns: Sequence[str] | None) -> bool: if patterns is None: return True return any(re.search(pat, key) for pat in patterns) -def strip_prefix(key: str, prefixes_to_strip: tuple[str]) -> str: +def strip_prefix(key: str, prefixes_to_strip: tuple[str, ...]) -> str: for prefix in prefixes_to_strip: if key.startswith(prefix): return key[len(prefix) :] diff --git a/src/lerobot/datasets/push_dataset_to_hub/utils.py b/src/lerobot/datasets/push_dataset_to_hub/utils.py index 48214e1bf2..67a634a4bc 100644 --- a/src/lerobot/datasets/push_dataset_to_hub/utils.py +++ b/src/lerobot/datasets/push_dataset_to_hub/utils.py @@ -31,7 +31,7 @@ def calculate_episode_data_index(hf_dataset: datasets.Dataset) -> dict[str, torc - "from": A tensor containing the starting index of each episode. - "to": A tensor containing the ending index of each episode. """ - episode_data_index = {"from": [], "to": []} + episode_data_index: dict[str, list | torch.Tensor] = {"from": [], "to": []} current_episode = None """ diff --git a/src/lerobot/datasets/sampler.py b/src/lerobot/datasets/sampler.py index d0bb20c27e..2088915ff5 100644 --- a/src/lerobot/datasets/sampler.py +++ b/src/lerobot/datasets/sampler.py @@ -39,7 +39,7 @@ def __init__( drop_n_last_frames: Number of frames to drop from the end of each episode. shuffle: Whether to shuffle the indices. """ - indices = [] + indices: list[int] = [] for episode_idx, (start_index, end_index) in enumerate( zip(dataset_from_indices, dataset_to_indices, strict=True) ): diff --git a/src/lerobot/datasets/streaming_dataset.py b/src/lerobot/datasets/streaming_dataset.py index 454389d46f..548b4dd595 100644 --- a/src/lerobot/datasets/streaming_dataset.py +++ b/src/lerobot/datasets/streaming_dataset.py @@ -33,10 +33,7 @@ item_to_torch, safe_shard, ) -from lerobot.datasets.video_utils import ( - VideoDecoderCache, - decode_video_frames_torchcodec, -) +from lerobot.datasets.video_utils import VideoDecoderCache, decode_video_frames_torchcodec from lerobot.utils.constants import HF_LEROBOT_HOME, LOOKAHEAD_BACKTRACKTABLE, LOOKBACK_BACKTRACKTABLE @@ -84,7 +81,7 @@ def __init__( root: str | Path | None = None, episodes: list[int] | None = None, image_transforms: Callable | None = None, - delta_timestamps: dict[list[float]] | None = None, + delta_timestamps: dict[str, list[float]] | None = None, tolerance_s: float = 1e-4, revision: str | None = None, force_cache_sync: bool = False, @@ -130,7 +127,7 @@ def __init__( self.buffer_size = buffer_size # We cache the video decoders to avoid re-initializing them at each frame (avoiding a ~10x slowdown) - self.video_decoder_cache = None + self.video_decoder_cache: VideoDecoderCache | None = None self.root.mkdir(exist_ok=True, parents=True) @@ -205,7 +202,7 @@ def __iter__(self) -> Iterator[dict[str, torch.Tensor]]: # the logic is to add 2 levels of randomness: # (1) sample one shard at random from the ones available, and # (2) sample one frame from the shard sampled at (1) - frames_buffer = [] + frames_buffer: list[dict] = [] while available_shards := list(idx_to_backtrack_dataset.keys()): shard_key = next(self._infinite_generator_over_elements(rng, available_shards)) backtrack_dataset = idx_to_backtrack_dataset[shard_key] # selects which shard to iterate on @@ -258,6 +255,7 @@ def _make_timestamps_from_indices( self, start_ts: float, indices: dict[str, list[int]] | None = None ) -> dict[str, list[float]]: if indices is not None: + assert self.delta_timestamps is not None # self.delta_timestamps has a default value of None return { key: ( start_ts + torch.tensor(indices[key]) / self.fps @@ -365,6 +363,8 @@ def _get_query_timestamps( ) -> dict[str, list[float]]: query_timestamps = {} keys_to_timestamps = self._make_timestamps_from_indices(current_ts, query_indices) + # TODO(#1722): Rewrite this method without adding a default value of None to episode_boundaries_ts + assert episode_boundaries_ts is not None for key in self.meta.video_keys: if query_indices is not None and key in query_indices: timestamps = keys_to_timestamps[key] @@ -414,6 +414,8 @@ def _get_delta_frames(self, dataset_iterator: Backtrackable, current_item: dict) query_result = {} padding = {} + assert self.delta_indices is not None + for key, delta_indices in self.delta_indices.items(): if key in self.meta.video_keys: continue # visual frames are decoded separately @@ -507,7 +509,7 @@ def _get_delta_frames(self, dataset_iterator: Backtrackable, current_item: dict) return query_result, padding - def _validate_delta_timestamp_keys(self, delta_timestamps: dict[list[float]]) -> None: + def _validate_delta_timestamp_keys(self, delta_timestamps: dict[str, list[float]]) -> None: """ Validate that all keys in delta_timestamps correspond to actual features in the dataset. diff --git a/src/lerobot/datasets/transforms.py b/src/lerobot/datasets/transforms.py index beacc48d98..653bd60b39 100644 --- a/src/lerobot/datasets/transforms.py +++ b/src/lerobot/datasets/transforms.py @@ -69,7 +69,7 @@ def __init__( self.n_subset = n_subset self.random_order = random_order - self.selected_transforms = None + self.selected_transforms: list[Any] | None = None def forward(self, *inputs: Any) -> Any: needs_unpacking = len(inputs) > 1 diff --git a/src/lerobot/datasets/utils.py b/src/lerobot/datasets/utils.py index 37d8432b2b..abff175512 100644 --- a/src/lerobot/datasets/utils.py +++ b/src/lerobot/datasets/utils.py @@ -153,7 +153,7 @@ def flatten_dict(d: dict, parent_key: str = "", sep: str = "/") -> dict: Returns: dict: A flattened dictionary. """ - items = [] + items: list[tuple[str, Any]] = [] for k, v in d.items(): new_key = f"{parent_key}{sep}{k}" if parent_key else k if isinstance(v, dict): @@ -178,7 +178,7 @@ def unflatten_dict(d: dict, sep: str = "/") -> dict: Returns: dict: A nested dictionary. """ - outdict = {} + outdict: dict = {} for key, value in d.items(): parts = key.split(sep) d = outdict @@ -371,7 +371,7 @@ def load_episodes(local_dir: Path) -> datasets.Dataset: def load_image_as_numpy( - fpath: str | Path, dtype: np.dtype = np.float32, channel_first: bool = True + fpath: str | Path, dtype: np.typing.DTypeLike = np.float32, channel_first: bool = True ) -> np.ndarray: """Load an image from a file into a numpy array. @@ -1034,17 +1034,20 @@ def validate_feature_dtype_and_shape( expected_dtype = feature["dtype"] expected_shape = feature["shape"] if is_valid_numpy_dtype_string(expected_dtype): + assert isinstance(value, (np.ndarray, np.number, float)) return validate_feature_numpy_array(name, expected_dtype, expected_shape, value) elif expected_dtype in ["image", "video"]: + assert isinstance(value, (np.ndarray, PILImage.Image)) return validate_feature_image_or_video(name, expected_shape, value) elif expected_dtype == "string": + assert isinstance(value, str) return validate_feature_string(name, value) else: raise NotImplementedError(f"The feature dtype '{expected_dtype}' is not implemented yet.") def validate_feature_numpy_array( - name: str, expected_dtype: str, expected_shape: list[int], value: np.ndarray + name: str, expected_dtype: str, expected_shape: list[int], value: np.ndarray | np.number | float ) -> str: """Validate a feature that is expected to be a numpy array. diff --git a/src/lerobot/datasets/v30/augment_dataset_quantile_stats.py b/src/lerobot/datasets/v30/augment_dataset_quantile_stats.py index 900a43a4f0..e44a478ae7 100644 --- a/src/lerobot/datasets/v30/augment_dataset_quantile_stats.py +++ b/src/lerobot/datasets/v30/augment_dataset_quantile_stats.py @@ -41,7 +41,9 @@ import numpy as np import torch from huggingface_hub import HfApi -from requests import HTTPError + +# TODO(#1722): Install library stubs for requests +from requests import HTTPError # type: ignore[import-untyped] from tqdm import tqdm from lerobot.datasets.compute_stats import DEFAULT_QUANTILES, aggregate_stats, get_feature_stats @@ -104,6 +106,7 @@ def process_single_episode(dataset: LeRobotDataset, episode_idx: int) -> dict: continue data = torch.stack(data_list).cpu().numpy() + axes_to_reduce: int | tuple[int, int, int] if dataset.features[key]["dtype"] in ["image", "video"]: if data.dtype == np.uint8: data = data.astype(np.float32) / 255.0 @@ -258,3 +261,4 @@ def main(): if __name__ == "__main__": main() + main() diff --git a/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py b/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py index 74be6bfa4d..93ea5a8014 100644 --- a/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py +++ b/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py @@ -54,7 +54,9 @@ import tqdm from datasets import Dataset, Features, Image from huggingface_hub import HfApi, snapshot_download -from requests import HTTPError + +# TODO(#1722): Install library stubs for requests +from requests import HTTPError # type: ignore[import-untyped] from lerobot.datasets.compute_stats import aggregate_stats from lerobot.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDataset @@ -148,8 +150,8 @@ def legacy_load_episodes_stats(local_dir: Path) -> dict: def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]: - tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH) - tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])} + legacy_tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH) + tasks = {item["task_index"]: item["task"] for item in sorted(legacy_tasks, key=lambda x: x["task_index"])} task_to_task_index = {task: task_index for task_index, task in tasks.items()} return tasks, task_to_task_index @@ -204,7 +206,7 @@ def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int): ep_idx = 0 chunk_idx = 0 file_idx = 0 - size_in_mb = 0 + size_in_mb: float = 0 num_frames = 0 paths_to_cat = [] episodes_metadata = [] @@ -304,10 +306,10 @@ def convert_videos_of_camera(root: Path, new_root: Path, video_key: str, video_f ep_idx = 0 chunk_idx = 0 file_idx = 0 - size_in_mb = 0 + size_in_mb: float = 0 duration_in_s = 0.0 - paths_to_cat = [] - episodes_metadata = [] + paths_to_cat: list[Path | str] = [] + episodes_metadata: list[dict[str, Any]] = [] for ep_path in tqdm.tqdm(ep_paths, desc=f"convert videos of {video_key}"): ep_size_in_mb = get_file_size_in_mb(ep_path) @@ -569,3 +571,4 @@ def convert_dataset( args = parser.parse_args() convert_dataset(**vars(args)) + convert_dataset(**vars(args)) diff --git a/src/lerobot/datasets/video_utils.py b/src/lerobot/datasets/video_utils.py index 0de791919a..8756734e4e 100644 --- a/src/lerobot/datasets/video_utils.py +++ b/src/lerobot/datasets/video_utils.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import glob -import importlib +import importlib.util import logging import shutil import tempfile @@ -174,7 +174,7 @@ def decode_video_frames_torchvision( class VideoDecoderCache: """Thread-safe cache for video decoders to avoid expensive re-initialization.""" - def __init__(self): + def __init__(self) -> None: self._cache: dict[str, tuple[Any, Any]] = {} self._lock = Lock() @@ -429,7 +429,7 @@ def concatenate_video_files( with tempfile.NamedTemporaryFile(mode="w", suffix=".ffconcat", delete=False) as tmp_concatenate_file: tmp_concatenate_file.write("ffconcat version 1.0\n") for input_path in input_video_paths: - tmp_concatenate_file.write(f"file '{str(input_path.resolve())}'\n") + tmp_concatenate_file.write(f"file '{str(Path(input_path).resolve())}'\n") tmp_concatenate_file.flush() tmp_concatenate_path = tmp_concatenate_file.name