diff --git a/examples/decoding/basic_example.py b/examples/decoding/basic_example.py index 8440b6814..86fa8e6e4 100644 --- a/examples/decoding/basic_example.py +++ b/examples/decoding/basic_example.py @@ -18,7 +18,6 @@ # plotting utility. You can ignore that part and jump right below to # :ref:`creating_decoder`. -from typing import Optional import torch import requests @@ -33,7 +32,7 @@ raw_video_bytes = response.content -def plot(frames: torch.Tensor, title : Optional[str] = None): +def plot(frames: torch.Tensor, title: str | None = None): try: from torchvision.utils import make_grid from torchvision.transforms.v2.functional import to_pil_image diff --git a/examples/decoding/custom_frame_mappings.py b/examples/decoding/custom_frame_mappings.py index a62bc9eb0..b41bf2342 100644 --- a/examples/decoding/custom_frame_mappings.py +++ b/examples/decoding/custom_frame_mappings.py @@ -82,7 +82,15 @@ # Lets define a simple function to run ffprobe on a video's first stream index, then writes the results in output_json_path. def generate_frame_mappings(video_path, output_json_path, stream_index): - ffprobe_cmd = ["ffprobe", "-i", f"{video_path}", "-select_streams", f"{stream_index}", "-show_frames", "-show_entries", "frame=pts,duration,key_frame", "-of", "json"] + ffprobe_cmd = [ + "ffprobe", + "-i", f"{video_path}", + "-select_streams", f"{stream_index}", + "-show_frames", + "-show_entries", + "frame=pts,duration,key_frame", + "-of", "json", + ] print(f"Running ffprobe:\n{' '.join(ffprobe_cmd)}\n") ffprobe_result = subprocess.run(ffprobe_cmd, check=True, capture_output=True, text=True) with open(output_json_path, "w") as f: @@ -157,7 +165,7 @@ def bench(f, file_like=False, average_over=50, warmup=2, **f_kwargs): # so the performance benefits are realized. -def decode_frames(video_path, seek_mode = "exact", custom_frame_mappings = None): +def decode_frames(video_path, seek_mode="exact", custom_frame_mappings=None): decoder = VideoDecoder( source=video_path, seek_mode=seek_mode, diff --git a/examples/decoding/parallel_decoding.py b/examples/decoding/parallel_decoding.py index b5699a895..12e567231 100644 --- a/examples/decoding/parallel_decoding.py +++ b/examples/decoding/parallel_decoding.py @@ -31,7 +31,6 @@ # require efficient processing. You can ignore that part and jump right below to # :ref:`start_parallel_decoding`. -from typing import List import torch import requests import tempfile @@ -74,7 +73,7 @@ def report_stats(times, unit="s"): return med -def split_indices(indices: List[int], num_chunks: int) -> List[List[int]]: +def split_indices(indices: list[int], num_chunks: int) -> list[list[int]]: """Split a list of indices into approximately equal chunks.""" chunk_size = len(indices) // num_chunks chunks = [] @@ -155,7 +154,8 @@ def generate_long_video(temp_dir: str): # Let's start with a sequential approach as our baseline. This processes # frames one by one without any parallelization. -def decode_sequentially(indices: List[int], video_path=long_video_path): + +def decode_sequentially(indices: list[int], video_path=long_video_path): """Decode frames sequentially using a single decoder instance.""" decoder = VideoDecoder(video_path, seek_mode="approximate") return decoder.get_frames_at(indices) @@ -173,8 +173,9 @@ def decode_sequentially(indices: List[int], video_path=long_video_path): # via the ``num_ffmpeg_threads`` parameter. This approach uses multiple # threads within FFmpeg itself to accelerate decoding operations. + def decode_with_ffmpeg_parallelism( - indices: List[int], + indices: list[int], num_threads: int, video_path=long_video_path ): @@ -197,10 +198,11 @@ def decode_with_ffmpeg_parallelism( # # Process-based parallelism distributes work across multiple Python processes. + def decode_with_multiprocessing( - indices: List[int], + indices: list[int], num_processes: int, - video_path=long_video_path + video_path=long_video_path, ): """Decode frames using multiple processes with joblib.""" chunks = split_indices(indices, num_chunks=num_processes) @@ -226,8 +228,9 @@ def decode_with_multiprocessing( # Thread-based parallelism uses multiple threads within a single process. # TorchCodec releases the GIL, so this can be very effective. + def decode_with_multithreading( - indices: List[int], + indices: list[int], num_threads: int, video_path=long_video_path ): diff --git a/examples/decoding/sampling.py b/examples/decoding/sampling.py index 8fcd261e6..2f5adba36 100644 --- a/examples/decoding/sampling.py +++ b/examples/decoding/sampling.py @@ -19,7 +19,6 @@ # plotting utility. You can ignore that part and jump right below to # :ref:`sampling_tuto_start`. -from typing import Optional import torch import requests @@ -34,7 +33,7 @@ raw_video_bytes = response.content -def plot(frames: torch.Tensor, title : Optional[str] = None): +def plot(frames: torch.Tensor, title: str | None = None): try: from torchvision.utils import make_grid from torchvision.transforms.v2.functional import to_pil_image diff --git a/pyproject.toml b/pyproject.toml index 2f8b4da45..11e1908e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "torchcodec" description = "A video decoder for PyTorch" readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.10" license-files = ["LICENSE"] authors = [ { name = "PyTorch Team", email = "packages@pytorch.org" }, @@ -32,7 +32,7 @@ dev = [ first_party_detection = false [tool.black] -target-version = ["py38"] +target-version = ["py310"] [tool.ufmt] diff --git a/src/torchcodec/_core/_metadata.py b/src/torchcodec/_core/_metadata.py index 1d5a7d103..482d0e1cb 100644 --- a/src/torchcodec/_core/_metadata.py +++ b/src/torchcodec/_core/_metadata.py @@ -4,12 +4,12 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. + import dataclasses import json import pathlib from dataclasses import dataclass from fractions import Fraction -from typing import List, Optional, Union import torch @@ -25,21 +25,21 @@ @dataclass class StreamMetadata: - duration_seconds_from_header: Optional[float] + duration_seconds_from_header: float | None """Duration of the stream, in seconds, obtained from the header (float or None). This could be inaccurate.""" - begin_stream_seconds_from_header: Optional[float] + begin_stream_seconds_from_header: float | None """Beginning of the stream, in seconds, obtained from the header (float or None). Usually, this is equal to 0.""" - bit_rate: Optional[float] + bit_rate: float | None """Bit rate of the stream, in seconds (float or None).""" - codec: Optional[str] + codec: str | None """Codec (str or None).""" stream_index: int """Index of the stream that this metadata refers to (int).""" # Computed fields (computed in C++ with fallback logic) - duration_seconds: Optional[float] + duration_seconds: float | None """Duration of the stream in seconds. We try to calculate the duration from the actual frames if a :term:`scan` was performed. Otherwise we fall back to ``duration_seconds_from_header``. If that value is also None, @@ -47,7 +47,7 @@ class StreamMetadata: ``average_fps_from_header``. If all of those are unavailable, we fall back to the container-level ``duration_seconds_from_header``. """ - begin_stream_seconds: Optional[float] + begin_stream_seconds: float | None """Beginning of the stream, in seconds (float). Conceptually, this corresponds to the first frame's :term:`pts`. If a :term:`scan` was performed and ``begin_stream_seconds_from_content`` is not None, then it is returned. @@ -65,12 +65,12 @@ def __repr__(self): class VideoStreamMetadata(StreamMetadata): """Metadata of a single video stream.""" - begin_stream_seconds_from_content: Optional[float] + begin_stream_seconds_from_content: float | None """Beginning of the stream, in seconds (float or None). Conceptually, this corresponds to the first frame's :term:`pts`. It is only computed when a :term:`scan` is done as min(frame.pts) across all frames in the stream. Usually, this is equal to 0.""" - end_stream_seconds_from_content: Optional[float] + end_stream_seconds_from_content: float | None """End of the stream, in seconds (float or None). Conceptually, this corresponds to last_frame.pts + last_frame.duration. It is only computed when a :term:`scan` is done as max(frame.pts + @@ -81,42 +81,42 @@ class VideoStreamMetadata(StreamMetadata): simply indexing the :class:`~torchcodec.decoders.VideoDecoder` object with ``[-1]``. """ - width: Optional[int] + width: int | None """Width of the frames (int or None).""" - height: Optional[int] + height: int | None """Height of the frames (int or None).""" - num_frames_from_header: Optional[int] + num_frames_from_header: int | None """Number of frames, from the stream's metadata. This is potentially inaccurate. We recommend using the ``num_frames`` attribute instead. (int or None).""" - num_frames_from_content: Optional[int] + num_frames_from_content: int | None """Number of frames computed by TorchCodec by scanning the stream's content (the scan doesn't involve decoding). This is more accurate than ``num_frames_from_header``. We recommend using the ``num_frames`` attribute instead. (int or None).""" - average_fps_from_header: Optional[float] + average_fps_from_header: float | None """Averate fps of the stream, obtained from the header (float or None). We recommend using the ``average_fps`` attribute instead.""" - pixel_aspect_ratio: Optional[Fraction] + pixel_aspect_ratio: Fraction | None """Pixel Aspect Ratio (PAR), also known as Sample Aspect Ratio (SAR --- not to be confused with Storage Aspect Ratio, also SAR), is the ratio between the width and height of each pixel (``fractions.Fraction`` or None).""" # Computed fields (computed in C++ with fallback logic) - end_stream_seconds: Optional[float] + end_stream_seconds: float | None """End of the stream, in seconds (float or None). Conceptually, this corresponds to last_frame.pts + last_frame.duration. If :term:`scan` was performed and``end_stream_seconds_from_content`` is not None, then that value is returned. Otherwise, returns ``duration_seconds``. """ - num_frames: Optional[int] + num_frames: int | None """Number of frames in the stream (int or None). This corresponds to ``num_frames_from_content`` if a :term:`scan` was made, otherwise it corresponds to ``num_frames_from_header``. If that value is also None, the number of frames is calculated from the duration and the average fps. """ - average_fps: Optional[float] + average_fps: float | None """Average fps of the stream. If a :term:`scan` was perfomed, this is computed from the number of frames and the duration of the stream. Otherwise we fall back to ``average_fps_from_header``. @@ -130,11 +130,11 @@ def __repr__(self): class AudioStreamMetadata(StreamMetadata): """Metadata of a single audio stream.""" - sample_rate: Optional[int] + sample_rate: int | None """The original sample rate.""" - num_channels: Optional[int] + num_channels: int | None """The number of channels (1 for mono, 2 for stereo, etc.)""" - sample_format: Optional[str] + sample_format: str | None """The original sample format, as described by FFmpeg. E.g. 'fltp', 's32', etc.""" def __repr__(self): @@ -143,19 +143,19 @@ def __repr__(self): @dataclass class ContainerMetadata: - duration_seconds_from_header: Optional[float] - bit_rate_from_header: Optional[float] - best_video_stream_index: Optional[int] - best_audio_stream_index: Optional[int] + duration_seconds_from_header: float | None + bit_rate_from_header: float | None + best_video_stream_index: int | None + best_audio_stream_index: int | None - streams: List[StreamMetadata] + streams: list[StreamMetadata] @property - def duration_seconds(self) -> Optional[float]: + def duration_seconds(self) -> float | None: raise NotImplementedError("Decide on logic and implement this!") @property - def bit_rate(self) -> Optional[float]: + def bit_rate(self) -> float | None: raise NotImplementedError("Decide on logic and implement this!") @property @@ -195,7 +195,7 @@ def get_container_metadata(decoder: torch.Tensor) -> ContainerMetadata: """ container_dict = json.loads(_get_container_json_metadata(decoder)) - streams_metadata: List[StreamMetadata] = [] + streams_metadata: list[StreamMetadata] = [] for stream_index in range(container_dict["numStreams"]): stream_dict = json.loads(_get_stream_json_metadata(decoder, stream_index)) common_meta = dict( @@ -255,7 +255,7 @@ def get_container_metadata(decoder: torch.Tensor) -> ContainerMetadata: def get_container_metadata_from_header( - filename: Union[str, pathlib.Path] + filename: str | pathlib.Path, ) -> ContainerMetadata: return get_container_metadata( create_from_file(str(filename), seek_mode="approximate") diff --git a/src/torchcodec/_core/ops.py b/src/torchcodec/_core/ops.py index 921f5ee54..3a7906434 100644 --- a/src/torchcodec/_core/ops.py +++ b/src/torchcodec/_core/ops.py @@ -4,11 +4,11 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. + import io import json import warnings from types import ModuleType -from typing import List, Optional, Tuple, Union import torch from torch.library import get_ctx, register_fake @@ -19,7 +19,7 @@ _load_pybind11_module, ) -_pybind_ops: Optional[ModuleType] = None +_pybind_ops: ModuleType | None = None def load_torchcodec_shared_libraries(): @@ -149,9 +149,7 @@ def load_torchcodec_shared_libraries(): # ============================= # Functions not related to custom ops, but similar implementation to c++ ops # ============================= -def create_from_bytes( - video_bytes: bytes, seek_mode: Optional[str] = None -) -> torch.Tensor: +def create_from_bytes(video_bytes: bytes, seek_mode: str | None = None) -> torch.Tensor: with warnings.catch_warnings(): # Ignore warning stating that the underlying video_bytes buffer is # non-writable. @@ -161,7 +159,7 @@ def create_from_bytes( def create_from_file_like( - file_like: Union[io.RawIOBase, io.BufferedReader], seek_mode: Optional[str] = None + file_like: io.RawIOBase | io.BufferedReader, seek_mode: str | None = None ) -> torch.Tensor: assert _pybind_ops is not None return _create_from_file_like( @@ -176,10 +174,10 @@ def encode_audio_to_file_like( samples: torch.Tensor, sample_rate: int, format: str, - file_like: Union[io.RawIOBase, io.BufferedIOBase], - bit_rate: Optional[int] = None, - num_channels: Optional[int] = None, - desired_sample_rate: Optional[int] = None, + file_like: io.RawIOBase | io.BufferedIOBase, + bit_rate: int | None = None, + num_channels: int | None = None, + desired_sample_rate: int | None = None, ) -> None: """Encode audio samples to a file-like object. @@ -212,12 +210,12 @@ def encode_video_to_file_like( frames: torch.Tensor, frame_rate: float, format: str, - file_like: Union[io.RawIOBase, io.BufferedIOBase], - codec: Optional[str] = None, - pixel_format: Optional[str] = None, - crf: Optional[Union[int, float]] = None, - preset: Optional[str] = None, - extra_options: Optional[list[str]] = None, + file_like: io.RawIOBase | io.BufferedIOBase, + codec: str | None = None, + pixel_format: str | None = None, + crf: int | float | None = None, + preset: str | None = None, + extra_options: list[str] | None = None, ) -> None: """Encode video frames to a file-like object. @@ -248,8 +246,8 @@ def encode_video_to_file_like( def get_frames_at_indices( - decoder: torch.Tensor, *, frame_indices: Union[torch.Tensor, list[int]] -) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + decoder: torch.Tensor, *, frame_indices: torch.Tensor | list[int] +) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: if isinstance(frame_indices, torch.Tensor): # Ensure indices is the correct dtype (int64) frame_indices = frame_indices.to(torch.int64) @@ -260,8 +258,8 @@ def get_frames_at_indices( def get_frames_by_pts( - decoder: torch.Tensor, *, timestamps: Union[torch.Tensor, list[float]] -) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + decoder: torch.Tensor, *, timestamps: torch.Tensor | list[float] +) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: if isinstance(timestamps, torch.Tensor): # Ensure indices is the correct dtype (float64) timestamps = timestamps.to(torch.float64) @@ -278,13 +276,13 @@ def get_frames_by_pts( # Abstract impl for the operators. Needed by torch.compile. # ============================== @register_fake("torchcodec_ns::create_from_file") -def create_from_file_abstract(filename: str, seek_mode: Optional[str]) -> torch.Tensor: +def create_from_file_abstract(filename: str, seek_mode: str | None) -> torch.Tensor: return torch.empty([], dtype=torch.long) @register_fake("torchcodec_ns::_create_from_file_like") def _create_from_file_like_abstract( - file_like: int, seek_mode: Optional[str] + file_like: int, seek_mode: str | None ) -> torch.Tensor: return torch.empty([], dtype=torch.long) @@ -294,9 +292,9 @@ def encode_audio_to_file_abstract( samples: torch.Tensor, sample_rate: int, filename: str, - bit_rate: Optional[int] = None, - num_channels: Optional[int] = None, - desired_sample_rate: Optional[int] = None, + bit_rate: int | None = None, + num_channels: int | None = None, + desired_sample_rate: int | None = None, ) -> None: return @@ -306,9 +304,9 @@ def encode_audio_to_tensor_abstract( samples: torch.Tensor, sample_rate: int, format: str, - bit_rate: Optional[int] = None, - num_channels: Optional[int] = None, - desired_sample_rate: Optional[int] = None, + bit_rate: int | None = None, + num_channels: int | None = None, + desired_sample_rate: int | None = None, ) -> torch.Tensor: return torch.empty([], dtype=torch.long) @@ -319,9 +317,9 @@ def _encode_audio_to_file_like_abstract( sample_rate: int, format: str, file_like_context: int, - bit_rate: Optional[int] = None, - num_channels: Optional[int] = None, - desired_sample_rate: Optional[int] = None, + bit_rate: int | None = None, + num_channels: int | None = None, + desired_sample_rate: int | None = None, ) -> None: return @@ -331,11 +329,11 @@ def encode_video_to_file_abstract( frames: torch.Tensor, frame_rate: float, filename: str, - codec: Optional[str] = None, - pixel_format: Optional[str] = None, - preset: Optional[str] = None, - crf: Optional[Union[int, float]] = None, - extra_options: Optional[list[str]] = None, + codec: str | None = None, + pixel_format: str | None = None, + preset: str | None = None, + crf: int | float | None = None, + extra_options: list[str] | None = None, ) -> None: return @@ -345,11 +343,11 @@ def encode_video_to_tensor_abstract( frames: torch.Tensor, frame_rate: float, format: str, - codec: Optional[str] = None, - pixel_format: Optional[str] = None, - preset: Optional[str] = None, - crf: Optional[Union[int, float]] = None, - extra_options: Optional[list[str]] = None, + codec: str | None = None, + pixel_format: str | None = None, + preset: str | None = None, + crf: int | float | None = None, + extra_options: list[str] | None = None, ) -> torch.Tensor: return torch.empty([], dtype=torch.long) @@ -360,18 +358,18 @@ def _encode_video_to_file_like_abstract( frame_rate: float, format: str, file_like_context: int, - codec: Optional[str] = None, - pixel_format: Optional[str] = None, - preset: Optional[str] = None, - crf: Optional[Union[int, float]] = None, - extra_options: Optional[list[str]] = None, + codec: str | None = None, + pixel_format: str | None = None, + preset: str | None = None, + crf: int | float | None = None, + extra_options: list[str] | None = None, ) -> None: return @register_fake("torchcodec_ns::create_from_tensor") def create_from_tensor_abstract( - video_tensor: torch.Tensor, seek_mode: Optional[str] + video_tensor: torch.Tensor, seek_mode: str | None ) -> torch.Tensor: return torch.empty([], dtype=torch.long) @@ -380,16 +378,16 @@ def create_from_tensor_abstract( def _add_video_stream_abstract( decoder: torch.Tensor, *, - num_threads: Optional[int] = None, - dimension_order: Optional[str] = None, - stream_index: Optional[int] = None, + num_threads: int | None = None, + dimension_order: str | None = None, + stream_index: int | None = None, device: str = "cpu", device_variant: str = "ffmpeg", transform_specs: str = "", - custom_frame_mappings: Optional[ - tuple[torch.Tensor, torch.Tensor, torch.Tensor] - ] = None, - color_conversion_library: Optional[str] = None, + custom_frame_mappings: ( + tuple[torch.Tensor, torch.Tensor, torch.Tensor] | None + ) = None, + color_conversion_library: str | None = None, ) -> None: return @@ -398,15 +396,15 @@ def _add_video_stream_abstract( def add_video_stream_abstract( decoder: torch.Tensor, *, - num_threads: Optional[int] = None, - dimension_order: Optional[str] = None, - stream_index: Optional[int] = None, + num_threads: int | None = None, + dimension_order: str | None = None, + stream_index: int | None = None, device: str = "cpu", device_variant: str = "ffmpeg", transform_specs: str = "", - custom_frame_mappings: Optional[ - tuple[torch.Tensor, torch.Tensor, torch.Tensor] - ] = None, + custom_frame_mappings: ( + tuple[torch.Tensor, torch.Tensor, torch.Tensor] | None + ) = None, ) -> None: return @@ -415,9 +413,9 @@ def add_video_stream_abstract( def add_audio_stream_abstract( decoder: torch.Tensor, *, - stream_index: Optional[int] = None, - sample_rate: Optional[int] = None, - num_channels: Optional[int] = None, + stream_index: int | None = None, + sample_rate: int | None = None, + num_channels: int | None = None, ) -> None: return @@ -430,7 +428,7 @@ def seek_abstract(decoder: torch.Tensor, seconds: float) -> None: @register_fake("torchcodec_ns::get_next_frame") def get_next_frame_abstract( decoder: torch.Tensor, -) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: +) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: # Images are 3 dimensions: height, width, channels. # The exact permutation depends on the constructor options passed in. image_size = [get_ctx().new_dynamic_size() for _ in range(3)] @@ -444,7 +442,7 @@ def get_next_frame_abstract( @register_fake("torchcodec_ns::get_frame_at_pts") def get_frame_at_pts_abstract( decoder: torch.Tensor, seconds: float -) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: +) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: image_size = [get_ctx().new_dynamic_size() for _ in range(3)] return ( torch.empty(image_size), @@ -457,8 +455,8 @@ def get_frame_at_pts_abstract( def get_frames_by_pts_abstract( decoder: torch.Tensor, *, - timestamps: Union[torch.Tensor, List[float]], -) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + timestamps: torch.Tensor | list[float], +) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: image_size = [get_ctx().new_dynamic_size() for _ in range(4)] return ( torch.empty(image_size), @@ -470,7 +468,7 @@ def get_frames_by_pts_abstract( @register_fake("torchcodec_ns::get_frame_at_index") def get_frame_at_index_abstract( decoder: torch.Tensor, *, frame_index: int -) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: +) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: image_size = [get_ctx().new_dynamic_size() for _ in range(3)] return ( torch.empty(image_size), @@ -481,8 +479,8 @@ def get_frame_at_index_abstract( @register_fake("torchcodec_ns::get_frames_at_indices") def get_frames_at_indices_abstract( - decoder: torch.Tensor, *, frame_indices: Union[torch.Tensor, List[int]] -) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + decoder: torch.Tensor, *, frame_indices: torch.Tensor | list[int] +) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: image_size = [get_ctx().new_dynamic_size() for _ in range(4)] return ( torch.empty(image_size), @@ -497,8 +495,8 @@ def get_frames_in_range_abstract( *, start: int, stop: int, - step: Optional[int] = None, -) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + step: int | None = None, +) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: image_size = [get_ctx().new_dynamic_size() for _ in range(4)] return ( torch.empty(image_size), @@ -513,7 +511,7 @@ def get_frames_by_pts_in_range_abstract( *, start_seconds: float, stop_seconds: float, -) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: +) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: image_size = [get_ctx().new_dynamic_size() for _ in range(4)] return ( torch.empty(image_size), @@ -527,8 +525,8 @@ def get_frames_by_pts_in_range_audio_abstract( decoder: torch.Tensor, *, start_seconds: float, - stop_seconds: Optional[float] = None, -) -> Tuple[torch.Tensor, torch.Tensor]: + stop_seconds: float | None = None, +) -> tuple[torch.Tensor, torch.Tensor]: image_size = [get_ctx().new_dynamic_size() for _ in range(4)] return (torch.empty(image_size), torch.empty([], dtype=torch.float)) diff --git a/src/torchcodec/_frame.py b/src/torchcodec/_frame.py index b5d7d9d5a..87e758ae5 100644 --- a/src/torchcodec/_frame.py +++ b/src/torchcodec/_frame.py @@ -4,9 +4,10 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. + import dataclasses from dataclasses import dataclass -from typing import Iterable, Iterator, Union +from typing import Iterable, Iterator from torch import Tensor @@ -46,7 +47,7 @@ def __post_init__(self): self.pts_seconds = float(self.pts_seconds) self.duration_seconds = float(self.duration_seconds) - def __iter__(self) -> Iterator[Union[Tensor, float]]: + def __iter__(self) -> Iterator[Tensor | float]: for field in dataclasses.fields(self): yield getattr(self, field.name) @@ -137,7 +138,7 @@ def __post_init__(self): self.pts_seconds = float(self.pts_seconds) self.sample_rate = int(self.sample_rate) - def __iter__(self) -> Iterator[Union[Tensor, float]]: + def __iter__(self) -> Iterator[Tensor | float]: for field in dataclasses.fields(self): yield getattr(self, field.name) diff --git a/src/torchcodec/_samplers/video_clip_sampler.py b/src/torchcodec/_samplers/video_clip_sampler.py index 343728393..4a730d801 100644 --- a/src/torchcodec/_samplers/video_clip_sampler.py +++ b/src/torchcodec/_samplers/video_clip_sampler.py @@ -4,11 +4,12 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. + import abc import json import sys from dataclasses import dataclass, field -from typing import Any, Dict, List, Tuple, Union +from typing import Any import torch from torch import nn, Tensor @@ -82,7 +83,7 @@ class TimeBasedSamplerArgs(SamplerArgs): sample_start_second: float = 0.0 sample_end_second: float = float("inf") sample_per_second: float = 0.0 - target_sample_start_second: List[float] = field(default_factory=lambda: []) + target_sample_start_second: list[float] = field(default_factory=lambda: []) @dataclass @@ -117,21 +118,21 @@ def __init__( self, video_args: VideoArgs, sampler_args: SamplerArgs, - decoder_args: Union[None, DecoderArgs] = None, + decoder_args: DecoderArgs | None = None, ) -> None: super().__init__() self.video_args = video_args self.sampler_args = sampler_args self.decoder_args = DecoderArgs() if decoder_args is None else decoder_args - def forward(self, video_data: Tensor) -> Union[List[Any]]: + def forward(self, video_data: Tensor) -> list[Any]: """Sample video clips from the video data Args: video_data (`Tensor`): The video data Return - clips (` List[List[Tensor]]`): List of clips, where each clip is a list of Tensors, each tensor represents a frame image. + clips (` list[list[Tensor]]`): List of clips, where each clip is a list of Tensors, each tensor represents a frame image. """ @@ -151,7 +152,7 @@ def forward(self, video_data: Tensor) -> Union[List[Any]]: num_threads=self.decoder_args.num_threads, ) - clips: List[Any] = [] + clips: list[Any] = [] # Cast sampler args to be time based or index based if isinstance(self.sampler_args, TimeBasedSamplerArgs): time_based_sampler_args = self.sampler_args @@ -179,8 +180,8 @@ def _get_clips_for_index_based_sampling( self, video_decoder: Tensor, index_based_sampler_args: IndexBasedSamplerArgs, - metadata_json: Dict[str, Any], - ) -> List[Tensor]: + metadata_json: dict[str, Any], + ) -> list[Tensor]: """Get clips for index based sampling, the sampling is done in 3 steps: 1. Compute clip_start_idxs based on the sampler type and the sampler args; 2. For each clip, given clip_start_idx, video_frame_dilation, frames_per_clip, get indexes for all frames @@ -189,10 +190,10 @@ def _get_clips_for_index_based_sampling( Args: video_decoder (`Tensor`): The video decoder index_based_sampler_args (`IndexBasedSamplerArgs`): The index based sampler args - metadata_json (`Dict[str, Any]`): The metadata of the video in json format + metadata_json (`dict[str, Any]`): The metadata of the video in json format Returns: - clips (` List[Tensor]`): List of clips, where each clip is a Tensor represents list of frames, Tensor shape default is NCHW. + clips (` list[Tensor]`): List of clips, where each clip is a Tensor represents list of frames, Tensor shape default is NCHW. """ sample_start_index = max(0, index_based_sampler_args.sample_start_index) @@ -226,7 +227,7 @@ def _get_clips_for_index_based_sampling( clip_start_idx + i * index_based_sampler_args.video_frame_dilation for i in range(index_based_sampler_args.frames_per_clip) ] - # Need torch.stack to convert List[Tensor[int]] into 1D Tensor[int] + # Need torch.stack to convert list[Tensor[int]] into 1D Tensor[int] batch_indexes = torch.stack(batch_indexes) frames, *_ = get_frames_at_indices( video_decoder, @@ -238,18 +239,18 @@ def _get_clips_for_index_based_sampling( def _get_start_seconds( self, - metadata_json: Dict[str, Any], + metadata_json: dict[str, Any], time_based_sampler_args: TimeBasedSamplerArgs, - ) -> List[float]: + ) -> list[float]: """Get start seconds for each clip. Given different sampler type, the API returns different clip start seconds. Args: - metadata_json (`Dict[str, Any]`): The metadata of the video in json format + metadata_json (`dict[str, Any]`): The metadata of the video in json format time_based_sampler_args: (`TimeBasedSamplerArgs`): The time based sampler args Returns: - (`List[float]`): List of the sampled clip start position in seconds + (`list[float]`): List of the sampled clip start position in seconds """ video_duration_in_seconds = metadata_json["durationSecondsFromHeader"] @@ -277,7 +278,7 @@ def _get_start_seconds( "Cannot get clips because video duration is shorter than the clip duration!" ) sampler_type = time_based_sampler_args.sampler_type - clip_starts_in_seconds: List[float] = [] + clip_starts_in_seconds: list[float] = [] sample_start_second = max( time_based_sampler_args.sample_start_second, beginStreamSecondsFromContent, @@ -306,7 +307,7 @@ def _get_start_seconds( def _get_clip_with_start_second( self, start_second: float, video_decoder: Tensor, video_frame_dilation: int - ) -> List[Tensor]: + ) -> list[Tensor]: """Get clip with start second. Args: @@ -315,7 +316,7 @@ def _get_clip_with_start_second( `video_frame_dilation` (`int`): The video frame dilation, by default it's 1. Returns: - `clip` (`List[Tensor]`): clip is list of frame tensor. Dimension of each frame tensor is user specified, by default it's HWC. + `clip` (`list[Tensor]`): clip is list of frame tensor. Dimension of each frame tensor is user specified, by default it's HWC. """ seek_to_pts(video_decoder, start_second) frames_needed_per_clip = ( @@ -332,7 +333,7 @@ def _get_clip_with_start_second( def _compute_frame_width_height( self, ori_width: int, ori_height: int - ) -> Tuple[int, int]: + ) -> tuple[int, int]: """Compute output frame width and height desired_width, desired_height, desired_min_dimension, desired_max_dimension, (`int`): Together decide the size of the decoded video clips. (Default: `0`). Note that the desired_width/desired_height parameters are mutually exclusive with desired_min_dimension/desired_max_dimension parameters. @@ -364,7 +365,7 @@ def _compute_frame_width_height( ori_height (`int`): Original height of the video Returns: - (`Tuple[int, int]`): output frame width and height + (`tuple[int, int]`): output frame width and height """ width_height_ratio = ori_width / ori_height height_width_ratio = ori_height / ori_width diff --git a/src/torchcodec/decoders/_audio_decoder.py b/src/torchcodec/decoders/_audio_decoder.py index 9d9f13717..e1d0e0461 100644 --- a/src/torchcodec/decoders/_audio_decoder.py +++ b/src/torchcodec/decoders/_audio_decoder.py @@ -4,9 +4,9 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. + import io from pathlib import Path -from typing import Optional, Union import torch from torch import Tensor @@ -54,11 +54,11 @@ class AudioDecoder: def __init__( self, - source: Union[str, Path, io.RawIOBase, io.BufferedReader, bytes, Tensor], + source: str | Path | io.RawIOBase | io.BufferedReader | bytes | Tensor, *, - stream_index: Optional[int] = None, - sample_rate: Optional[int] = None, - num_channels: Optional[int] = None, + stream_index: int | None = None, + sample_rate: int | None = None, + num_channels: int | None = None, ): torch._C._log_api_usage_once("torchcodec.decoders.AudioDecoder") self._decoder = create_decoder(source=source, seek_mode="approximate") @@ -108,7 +108,7 @@ def get_all_samples(self) -> AudioSamples: return self.get_samples_played_in_range() def get_samples_played_in_range( - self, start_seconds: float = 0.0, stop_seconds: Optional[float] = None + self, start_seconds: float = 0.0, stop_seconds: float | None = None ) -> AudioSamples: """Returns audio samples in the given range. diff --git a/src/torchcodec/decoders/_decoder_utils.py b/src/torchcodec/decoders/_decoder_utils.py index 2619acd24..27b745ea1 100644 --- a/src/torchcodec/decoders/_decoder_utils.py +++ b/src/torchcodec/decoders/_decoder_utils.py @@ -4,12 +4,13 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. + import contextvars import io from contextlib import contextmanager from pathlib import Path -from typing import Generator, Union +from typing import Generator from torch import Tensor from torchcodec import _core as core @@ -22,7 +23,7 @@ def create_decoder( *, - source: Union[str, Path, io.RawIOBase, io.BufferedReader, bytes, Tensor], + source: str | Path | io.RawIOBase | io.BufferedReader | bytes | Tensor, seek_mode: str, ) -> Tensor: if isinstance(source, str): diff --git a/src/torchcodec/decoders/_video_decoder.py b/src/torchcodec/decoders/_video_decoder.py index 9c2727bad..45d6dfabb 100644 --- a/src/torchcodec/decoders/_video_decoder.py +++ b/src/torchcodec/decoders/_video_decoder.py @@ -4,11 +4,12 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. + import io import json import numbers from pathlib import Path -from typing import Literal, Optional, Sequence, Tuple, Union +from typing import Literal, Sequence import torch from torch import device as torch_device, nn, Tensor @@ -105,17 +106,17 @@ class VideoDecoder: def __init__( self, - source: Union[str, Path, io.RawIOBase, io.BufferedReader, bytes, Tensor], + source: str | Path | io.RawIOBase | io.BufferedReader | bytes | Tensor, *, - stream_index: Optional[int] = None, + stream_index: int | None = None, dimension_order: Literal["NCHW", "NHWC"] = "NCHW", num_ffmpeg_threads: int = 1, - device: Optional[Union[str, torch_device]] = None, + device: str | torch_device | None = None, seek_mode: Literal["exact", "approximate"] = "exact", - transforms: Optional[Sequence[Union[DecoderTransform, nn.Module]]] = None, - custom_frame_mappings: Optional[ - Union[str, bytes, io.RawIOBase, io.BufferedReader] - ] = None, + transforms: Sequence[DecoderTransform | nn.Module] | None = None, + custom_frame_mappings: ( + str | bytes | io.RawIOBase | io.BufferedReader | None + ) = None, ): torch._C._log_api_usage_once("torchcodec.decoders.VideoDecoder") allowed_seek_modes = ("exact", "approximate") @@ -205,7 +206,7 @@ def _getitem_slice(self, key: slice) -> Tensor: ) return frame_data - def __getitem__(self, key: Union[numbers.Integral, slice]) -> Tensor: + def __getitem__(self, key: numbers.Integral | slice) -> Tensor: """Return frame or frames as tensors, at the given index or range. .. note:: @@ -262,7 +263,7 @@ def get_frame_at(self, index: int) -> Frame: duration_seconds=duration_seconds.item(), ) - def get_frames_at(self, indices: Union[torch.Tensor, list[int]]) -> FrameBatch: + def get_frames_at(self, indices: torch.Tensor | list[int]) -> FrameBatch: """Return frames at the given indices. Args: @@ -339,9 +340,7 @@ def get_frame_played_at(self, seconds: float) -> Frame: duration_seconds=duration_seconds.item(), ) - def get_frames_played_at( - self, seconds: Union[torch.Tensor, list[float]] - ) -> FrameBatch: + def get_frames_played_at(self, seconds: torch.Tensor | list[float]) -> FrameBatch: """Return frames played at the given timestamps in seconds. Args: @@ -404,8 +403,8 @@ def get_frames_played_in_range( def _get_and_validate_stream_metadata( *, decoder: Tensor, - stream_index: Optional[int] = None, -) -> Tuple[core._metadata.VideoStreamMetadata, int, float, float, int]: + stream_index: int | None = None, +) -> tuple[core._metadata.VideoStreamMetadata, int, float, float, int]: container_metadata = core.get_container_metadata(decoder) @@ -453,7 +452,7 @@ def _get_and_validate_stream_metadata( def _read_custom_frame_mappings( - custom_frame_mappings: Union[str, bytes, io.RawIOBase, io.BufferedReader] + custom_frame_mappings: str | bytes | io.RawIOBase | io.BufferedReader, ) -> tuple[Tensor, Tensor, Tensor]: """Parse custom frame mappings from JSON data and extract frame metadata. diff --git a/src/torchcodec/encoders/_audio_encoder.py b/src/torchcodec/encoders/_audio_encoder.py index fc8879cfa..769a98acc 100644 --- a/src/torchcodec/encoders/_audio_encoder.py +++ b/src/torchcodec/encoders/_audio_encoder.py @@ -1,5 +1,4 @@ from pathlib import Path -from typing import Optional, Union import torch from torch import Tensor @@ -44,11 +43,11 @@ def __init__(self, samples: Tensor, *, sample_rate: int): def to_file( self, - dest: Union[str, Path], + dest: str | Path, *, - bit_rate: Optional[int] = None, - num_channels: Optional[int] = None, - sample_rate: Optional[int] = None, + bit_rate: int | None = None, + num_channels: int | None = None, + sample_rate: int | None = None, ) -> None: """Encode samples into a file. @@ -79,9 +78,9 @@ def to_tensor( self, format: str, *, - bit_rate: Optional[int] = None, - num_channels: Optional[int] = None, - sample_rate: Optional[int] = None, + bit_rate: int | None = None, + num_channels: int | None = None, + sample_rate: int | None = None, ) -> Tensor: """Encode samples into raw bytes, as a 1D uint8 Tensor. @@ -115,9 +114,9 @@ def to_file_like( file_like, format: str, *, - bit_rate: Optional[int] = None, - num_channels: Optional[int] = None, - sample_rate: Optional[int] = None, + bit_rate: int | None = None, + num_channels: int | None = None, + sample_rate: int | None = None, ) -> None: """Encode samples into a file-like object. diff --git a/src/torchcodec/encoders/_video_encoder.py b/src/torchcodec/encoders/_video_encoder.py index 3fede6b8e..7e164e884 100644 --- a/src/torchcodec/encoders/_video_encoder.py +++ b/src/torchcodec/encoders/_video_encoder.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import Any, Dict, Optional, Union +from typing import Any import torch from torch import Tensor @@ -34,13 +34,13 @@ def __init__(self, frames: Tensor, *, frame_rate: float): def to_file( self, - dest: Union[str, Path], + dest: str | Path, *, - codec: Optional[str] = None, - pixel_format: Optional[str] = None, - crf: Optional[Union[int, float]] = None, - preset: Optional[Union[str, int]] = None, - extra_options: Optional[Dict[str, Any]] = None, + codec: str | None = None, + pixel_format: str | None = None, + crf: int | float | None = None, + preset: str | int | None = None, + extra_options: dict[str, Any] | None = None, ) -> None: """Encode frames into a file. @@ -86,11 +86,11 @@ def to_tensor( self, format: str, *, - codec: Optional[str] = None, - pixel_format: Optional[str] = None, - crf: Optional[Union[int, float]] = None, - preset: Optional[Union[str, int]] = None, - extra_options: Optional[Dict[str, Any]] = None, + codec: str | None = None, + pixel_format: str | None = None, + crf: int | float | None = None, + preset: str | int | None = None, + extra_options: dict[str, Any] | None = None, ) -> Tensor: """Encode frames into raw bytes, as a 1D uint8 Tensor. @@ -139,11 +139,11 @@ def to_file_like( file_like, format: str, *, - codec: Optional[str] = None, - pixel_format: Optional[str] = None, - crf: Optional[Union[int, float]] = None, - preset: Optional[Union[str, int]] = None, - extra_options: Optional[Dict[str, Any]] = None, + codec: str | None = None, + pixel_format: str | None = None, + crf: int | float | None = None, + preset: str | int | None = None, + extra_options: dict[str, Any] | None = None, ) -> None: """Encode frames into a file-like object. diff --git a/src/torchcodec/samplers/_common.py b/src/torchcodec/samplers/_common.py index a129a4483..3a0bd94e4 100644 --- a/src/torchcodec/samplers/_common.py +++ b/src/torchcodec/samplers/_common.py @@ -1,8 +1,8 @@ -from typing import Callable, Union +from typing import Callable from torchcodec import FrameBatch -_LIST_OF_INT_OR_FLOAT = Union[list[int], list[float]] +_LIST_OF_INT_OR_FLOAT = list[int] | list[float] def _repeat_last_policy( diff --git a/src/torchcodec/samplers/_index_based.py b/src/torchcodec/samplers/_index_based.py index d8f107c5e..fdbabcfbc 100644 --- a/src/torchcodec/samplers/_index_based.py +++ b/src/torchcodec/samplers/_index_based.py @@ -1,4 +1,4 @@ -from typing import Literal, Optional +from typing import Literal import torch @@ -125,7 +125,7 @@ def _generic_index_based_sampler( num_frames_per_clip: int, num_indices_between_frames: int, sampling_range_start: int, - sampling_range_end: Optional[int], # interval is [start, end). + sampling_range_end: int | None, # interval is [start, end). # Important note: sampling_range_end defines the upper bound of where a clip # can *start*, not where a clip can end. policy: Literal["repeat_last", "wrap", "error"], @@ -192,7 +192,7 @@ def clips_at_random_indices( num_frames_per_clip: int = 1, num_indices_between_frames: int = 1, sampling_range_start: int = 0, - sampling_range_end: Optional[int] = None, # interval is [start, end). + sampling_range_end: int | None = None, # interval is [start, end). policy: Literal["repeat_last", "wrap", "error"] = "repeat_last", ) -> FrameBatch: # See docstring below @@ -216,7 +216,7 @@ def clips_at_regular_indices( num_frames_per_clip: int = 1, num_indices_between_frames: int = 1, sampling_range_start: int = 0, - sampling_range_end: Optional[int] = None, # interval is [start, end). + sampling_range_end: int | None = None, # interval is [start, end). policy: Literal["repeat_last", "wrap", "error"] = "repeat_last", ) -> FrameBatch: # See docstring below diff --git a/src/torchcodec/samplers/_time_based.py b/src/torchcodec/samplers/_time_based.py index d58114121..6fd1f9688 100644 --- a/src/torchcodec/samplers/_time_based.py +++ b/src/torchcodec/samplers/_time_based.py @@ -1,4 +1,4 @@ -from typing import Literal, Optional +from typing import Literal import torch @@ -151,13 +151,13 @@ def _generic_time_based_sampler( kind: Literal["random", "regular"], decoder, *, - num_clips: Optional[int], # mutually exclusive with seconds_between_clip_starts - seconds_between_clip_starts: Optional[float], + num_clips: int | None, # mutually exclusive with seconds_between_clip_starts + seconds_between_clip_starts: float | None, num_frames_per_clip: int, - seconds_between_frames: Optional[float], + seconds_between_frames: float | None, # None means "begining", which may not always be 0 - sampling_range_start: Optional[float], - sampling_range_end: Optional[float], # interval is [start, end). + sampling_range_start: float | None, + sampling_range_end: float | None, # interval is [start, end). policy: Literal["repeat_last", "wrap", "error"] = "repeat_last", ) -> FrameBatch: # Note: *everywhere*, sampling_range_end denotes the upper bound of where a @@ -232,10 +232,10 @@ def clips_at_random_timestamps( *, num_clips: int = 1, num_frames_per_clip: int = 1, - seconds_between_frames: Optional[float] = None, + seconds_between_frames: float | None = None, # None means "begining", which may not always be 0 - sampling_range_start: Optional[float] = None, - sampling_range_end: Optional[float] = None, # interval is [start, end). + sampling_range_start: float | None = None, + sampling_range_end: float | None = None, # interval is [start, end). policy: Literal["repeat_last", "wrap", "error"] = "repeat_last", ) -> FrameBatch: # See docstring below @@ -258,10 +258,10 @@ def clips_at_regular_timestamps( *, seconds_between_clip_starts: float, num_frames_per_clip: int = 1, - seconds_between_frames: Optional[float] = None, + seconds_between_frames: float | None = None, # None means "begining", which may not always be 0 - sampling_range_start: Optional[float] = None, - sampling_range_end: Optional[float] = None, # interval is [start, end). + sampling_range_start: float | None = None, + sampling_range_end: float | None = None, # interval is [start, end). policy: Literal["repeat_last", "wrap", "error"] = "repeat_last", ) -> FrameBatch: # See docstring below diff --git a/src/torchcodec/transforms/_decoder_transforms.py b/src/torchcodec/transforms/_decoder_transforms.py index 6a99dd800..fbb98b667 100644 --- a/src/torchcodec/transforms/_decoder_transforms.py +++ b/src/torchcodec/transforms/_decoder_transforms.py @@ -4,9 +4,10 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. + from abc import ABC, abstractmethod from types import ModuleType -from typing import Optional, Sequence, Tuple, Union +from typing import Sequence import torch from torch import nn @@ -36,13 +37,11 @@ class DecoderTransform(ABC): """ @abstractmethod - def _make_transform_spec( - self, input_dims: Tuple[Optional[int], Optional[int]] - ) -> str: + def _make_transform_spec(self, input_dims: tuple[int | None, int | None]) -> str: """Makes the transform spec that is used by the `VideoDecoder`. Args: - input_dims (Tuple[Optional[int], Optional[int]]): The dimensions of + input_dims (tuple[int | None, int | None]): The dimensions of the input frame in the form (height, width). We cannot know the dimensions at object construction time because it's dependent on the video being decoded and upstream transforms in the same @@ -64,7 +63,7 @@ def _make_transform_spec( """ pass - def _get_output_dims(self) -> Optional[Tuple[Optional[int], Optional[int]]]: + def _get_output_dims(self) -> tuple[int | None, int | None] | None: """Get the dimensions of the output frame. Transforms that change the frame dimensions need to override this @@ -72,7 +71,7 @@ def _get_output_dims(self) -> Optional[Tuple[Optional[int], Optional[int]]]: this default implementation. Returns: - Optional[Tuple[Optional[int], Optional[int]]]: The output dimensions. + tuple[int | None, int | None] | None: The output dimensions. - None: The output dimensions are the same as the input dimensions. - (int, int): The (height, width) of the output frame. """ @@ -108,12 +107,10 @@ def __init__(self, size: Sequence[int]): ) self.size = size - def _make_transform_spec( - self, input_dims: Tuple[Optional[int], Optional[int]] - ) -> str: + def _make_transform_spec(self, input_dims: tuple[int | None, int | None]) -> str: return f"resize, {self.size[0]}, {self.size[1]}" - def _get_output_dims(self) -> Optional[Tuple[Optional[int], Optional[int]]]: + def _get_output_dims(self) -> tuple[int | None, int | None] | None: return (self.size[0], self.size[1]) @classmethod @@ -158,12 +155,10 @@ def __init__(self, size: Sequence[int]): ) self.size = size - def _make_transform_spec( - self, input_dims: Tuple[Optional[int], Optional[int]] - ) -> str: + def _make_transform_spec(self, input_dims: tuple[int | None, int | None]) -> str: return f"center_crop, {self.size[0]}, {self.size[1]}" - def _get_output_dims(self) -> Optional[Tuple[Optional[int], Optional[int]]]: + def _get_output_dims(self) -> tuple[int | None, int | None] | None: return (self.size[0], self.size[1]) @classmethod @@ -213,9 +208,7 @@ def __init__(self, size: Sequence[int]): ) self.size = size - def _make_transform_spec( - self, input_dims: Tuple[Optional[int], Optional[int]] - ) -> str: + def _make_transform_spec(self, input_dims: tuple[int | None, int | None]) -> str: height, width = input_dims if height is None: raise ValueError( @@ -242,7 +235,7 @@ def _make_transform_spec( return f"crop, {self.size[0]}, {self.size[1]}, {left}, {top}" - def _get_output_dims(self) -> Optional[Tuple[Optional[int], Optional[int]]]: + def _get_output_dims(self) -> tuple[int | None, int | None] | None: return (self.size[0], self.size[1]) @classmethod @@ -285,8 +278,8 @@ def _from_torchvision( def _make_transform_specs( - transforms: Optional[Sequence[Union[DecoderTransform, nn.Module]]], - input_dims: Tuple[Optional[int], Optional[int]], + transforms: Sequence[DecoderTransform | nn.Module] | None, + input_dims: tuple[int | None, int | None], ) -> str: """Given a sequence of transforms, turn those into the specification string the core API expects. @@ -347,10 +340,10 @@ def _make_transform_specs( # dimensions from its input dimensions. We store these with the converted # transform, to be all used together when we generate the specs. converted_transforms: list[ - Tuple[ + tuple[ DecoderTransform, # A (height, width) pair where the values may be missing. - Tuple[Optional[int], Optional[int]], + tuple[int | None, int | None], ] ] = [] curr_input_dims = input_dims diff --git a/test/generate_reference_resources.py b/test/generate_reference_resources.py index 3821c9299..7d28e9993 100644 --- a/test/generate_reference_resources.py +++ b/test/generate_reference_resources.py @@ -6,7 +6,6 @@ import subprocess from pathlib import Path -from typing import Optional import numpy as np @@ -39,7 +38,7 @@ def generate_frame_by_index( *, frame_index: int, stream_index: int, - filters: Optional[str] = None, + filters: str | None = None, ) -> None: # Note that we are using 0-based index naming. As a result, we are # generating files one-by-one, giving the actual file name that we want. diff --git a/test/utils.py b/test/utils.py index fb2d84483..39fa96597 100644 --- a/test/utils.py +++ b/test/utils.py @@ -6,7 +6,6 @@ import sys from dataclasses import dataclass, field -from typing import Dict, List, Optional, Union import numpy as np import pytest @@ -108,7 +107,7 @@ def get_python_version() -> tuple[int, int]: return (sys.version_info.major, sys.version_info.minor) -def cuda_version_used_for_building_torch() -> Optional[tuple[int, int]]: +def cuda_version_used_for_building_torch() -> tuple[int, int | None]: # Return the CUDA version that was used to build PyTorch. That's not always # the same as the CUDA version that is currently installed on the running # machine, which is what we actually want. On the CI though, these are the @@ -230,10 +229,10 @@ class TestContainerFile: filename: str default_stream_index: int - stream_infos: Dict[int, Union[TestVideoStreamInfo, TestAudioStreamInfo]] - frames: Dict[int, Dict[int, TestFrameInfo]] - _custom_frame_mappings_data: Dict[ - int, Optional[tuple[torch.Tensor, torch.Tensor, torch.Tensor]] + stream_infos: dict[int, TestVideoStreamInfo | TestAudioStreamInfo] + frames: dict[int, dict[int, TestFrameInfo]] + _custom_frame_mappings_data: dict[ + int, tuple[torch.Tensor, torch.Tensor, torch.Tensor | None] ] = field(default_factory=dict) def __post_init__(self): @@ -282,7 +281,7 @@ def to_tensor(self) -> torch.Tensor: return torch.from_numpy(arr) def get_frame_data_by_index( - self, idx: int, *, stream_index: Optional[int] = None + self, idx: int, *, stream_index: int | None = None ) -> torch.Tensor: raise NotImplementedError("Override in child classes") @@ -292,7 +291,7 @@ def get_frame_data_by_range( stop: int, step: int = 1, *, - stream_index: Optional[int] = None, + stream_index: int | None = None, ) -> torch.Tensor: raise NotImplementedError("Override in child classes") @@ -302,7 +301,7 @@ def get_pts_seconds_by_range( stop: int, step: int = 1, *, - stream_index: Optional[int] = None, + stream_index: int | None = None, ) -> torch.Tensor: if stream_index is None: stream_index = self.default_stream_index @@ -318,7 +317,7 @@ def get_duration_seconds_by_range( stop: int, step: int = 1, *, - stream_index: Optional[int] = None, + stream_index: int | None = None, ) -> torch.Tensor: if stream_index is None: stream_index = self.default_stream_index @@ -330,7 +329,7 @@ def get_duration_seconds_by_range( return torch.tensor(all_durations, dtype=torch.float64) def get_frame_info( - self, idx: int, *, stream_index: Optional[int] = None + self, idx: int, *, stream_index: int | None = None ) -> TestFrameInfo: if stream_index is None: stream_index = self.default_stream_index @@ -339,7 +338,7 @@ def get_frame_info( # This function is used to get the frame mappings for the custom_frame_mappings seek mode. def get_custom_frame_mappings( - self, stream_index: Optional[int] = None + self, stream_index: int | None = None ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: if stream_index is None: stream_index = self.default_stream_index @@ -383,7 +382,7 @@ class TestVideo(TestContainerFile): """Base class for the *video* streams of a video container""" def get_base_path_by_index( - self, idx: int, *, stream_index: int, filters: Optional[str] = None + self, idx: int, *, stream_index: int, filters: str | None = None ) -> pathlib.Path: stream_and_frame = f"stream{stream_index}.frame{idx:06d}" if filters is not None: @@ -397,8 +396,8 @@ def get_frame_data_by_index( self, idx: int, *, - stream_index: Optional[int] = None, - filters: Optional[str] = None, + stream_index: int | None = None, + filters: str | None = None, ) -> torch.Tensor: if stream_index is None: stream_index = self.default_stream_index @@ -415,7 +414,7 @@ def get_frame_data_by_range( stop: int, step: int = 1, *, - stream_index: Optional[int] = None, + stream_index: int | None = None, ) -> torch.Tensor: tensors = [ self.get_frame_data_by_index(i, stream_index=stream_index) @@ -441,19 +440,19 @@ def empty_chw_tensor(self) -> torch.Tensor: [0, self.num_color_channels, self.height, self.width], dtype=torch.uint8 ) - def get_width(self, *, stream_index: Optional[int] = None) -> int: + def get_width(self, *, stream_index: int | None = None) -> int: if stream_index is None: stream_index = self.default_stream_index return self.stream_infos[stream_index].width - def get_height(self, *, stream_index: Optional[int] = None) -> int: + def get_height(self, *, stream_index: int | None = None) -> int: if stream_index is None: stream_index = self.default_stream_index return self.stream_infos[stream_index].height - def get_num_color_channels(self, *, stream_index: Optional[int] = None) -> int: + def get_num_color_channels(self, *, stream_index: int | None = None) -> int: if stream_index is None: stream_index = self.default_stream_index @@ -617,10 +616,10 @@ class TestAudio(TestContainerFile): """Base class for the *audio* streams of a container (potentially a video), or a pure audio file""" - stream_infos: Dict[int, TestAudioStreamInfo] + stream_infos: dict[int, TestAudioStreamInfo] # stream_index -> list of 2D frame tensors of shape (num_channels, num_samples_in_that_frame) # num_samples_in_that_frame isn't necessarily constant for a given stream. - _reference_frames: Dict[int, List[torch.Tensor]] = field(default_factory=dict) + _reference_frames: dict[int, list[torch.Tensor]] = field(default_factory=dict) # Storing each individual frame is too expensive for audio, because there's # a massive overhead in the binary format saved by pytorch. Saving all the @@ -644,7 +643,7 @@ def __post_init__(self): ) def get_frame_data_by_index( - self, idx: int, *, stream_index: Optional[int] = None + self, idx: int, *, stream_index: int | None = None ) -> torch.Tensor: if stream_index is None: stream_index = self.default_stream_index @@ -657,7 +656,7 @@ def get_frame_data_by_range( stop: int, step: int = 1, *, - stream_index: Optional[int] = None, + stream_index: int | None = None, ) -> torch.Tensor: tensors = [ self.get_frame_data_by_index(i, stream_index=stream_index) @@ -666,7 +665,7 @@ def get_frame_data_by_range( return torch.cat(tensors, dim=-1) def get_frame_index( - self, *, pts_seconds: float, stream_index: Optional[int] = None + self, *, pts_seconds: float, stream_index: int | None = None ) -> int: if stream_index is None: stream_index = self.default_stream_index