2424pool manager.
2525"""
2626
27+ from __future__ import annotations
28+
2729# %%
2830# Let's first define some utility functions for benchmarking and data
2931# processing. We'll also download a video and create a longer version by
3032# repeating it multiple times. This simulates working with long videos that
3133# require efficient processing. You can ignore that part and jump right below to
3234# :ref:`start_parallel_decoding`.
3335
34- from typing import List
3536import torch
3637import requests
3738import tempfile
@@ -74,16 +75,16 @@ def report_stats(times, unit="s"):
7475 return med
7576
7677
77- def split_indices (indices : List [int ], num_chunks : int ) -> List [ List [int ]]:
78+ def split_indices (indices : list [int ], num_chunks : int ) -> list [ list [int ]]:
7879 """Split a list of indices into approximately equal chunks."""
7980 chunk_size = len (indices ) // num_chunks
8081 chunks = []
8182
8283 for i in range (num_chunks - 1 ):
83- chunks .append (indices [i * chunk_size : (i + 1 ) * chunk_size ])
84+ chunks .append (indices [i * chunk_size : (i + 1 ) * chunk_size ])
8485
8586 # Last chunk may be slightly larger
86- chunks .append (indices [(num_chunks - 1 ) * chunk_size :])
87+ chunks .append (indices [(num_chunks - 1 ) * chunk_size :])
8788 return chunks
8889
8990
@@ -96,18 +97,22 @@ def generate_long_video(temp_dir: str):
9697 raise RuntimeError (f"Failed to download video. { response .status_code = } ." )
9798
9899 short_video_path = Path (temp_dir ) / "short_video.mp4"
99- with open (short_video_path , 'wb' ) as f :
100+ with open (short_video_path , "wb" ) as f :
100101 for chunk in response .iter_content ():
101102 f .write (chunk )
102103
103104 # Create a longer video by repeating the short one 50 times
104105 long_video_path = Path (temp_dir ) / "long_video.mp4"
105106 ffmpeg_command = [
106- "ffmpeg" , "-y" ,
107- "-stream_loop" , "49" , # repeat video 50 times
108- "-i" , str (short_video_path ),
109- "-c" , "copy" ,
110- str (long_video_path )
107+ "ffmpeg" ,
108+ "-y" ,
109+ "-stream_loop" ,
110+ "49" , # repeat video 50 times
111+ "-i" ,
112+ str (short_video_path ),
113+ "-c" ,
114+ "copy" ,
115+ str (long_video_path ),
111116 ]
112117 subprocess .run (ffmpeg_command , check = True , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
113118
@@ -122,7 +127,9 @@ def generate_long_video(temp_dir: str):
122127
123128short_duration = timedelta (seconds = VideoDecoder (short_video_path ).metadata .duration_seconds )
124129long_duration = timedelta (seconds = metadata .duration_seconds )
125- print (f"Original video duration: { int (short_duration .total_seconds () // 60 )} m{ int (short_duration .total_seconds () % 60 ):02d} s" )
130+ print (
131+ f"Original video duration: { int (short_duration .total_seconds () // 60 )} m{ int (short_duration .total_seconds () % 60 ):02d} s"
132+ )
126133print (f"Long video duration: { int (long_duration .total_seconds () // 60 )} m{ int (long_duration .total_seconds () % 60 ):02d} s" )
127134print (f"Video resolution: { metadata .width } x{ metadata .height } " )
128135print (f"Average FPS: { metadata .average_fps :.1f} " )
@@ -155,7 +162,8 @@ def generate_long_video(temp_dir: str):
155162# Let's start with a sequential approach as our baseline. This processes
156163# frames one by one without any parallelization.
157164
158- def decode_sequentially (indices : List [int ], video_path = long_video_path ):
165+
166+ def decode_sequentially (indices : list [int ], video_path = long_video_path ):
159167 """Decode frames sequentially using a single decoder instance."""
160168 decoder = VideoDecoder (video_path , seek_mode = "approximate" )
161169 return decoder .get_frames_at (indices )
@@ -173,11 +181,8 @@ def decode_sequentially(indices: List[int], video_path=long_video_path):
173181# via the ``num_ffmpeg_threads`` parameter. This approach uses multiple
174182# threads within FFmpeg itself to accelerate decoding operations.
175183
176- def decode_with_ffmpeg_parallelism (
177- indices : List [int ],
178- num_threads : int ,
179- video_path = long_video_path
180- ):
184+
185+ def decode_with_ffmpeg_parallelism (indices : list [int ], num_threads : int , video_path = long_video_path ):
181186 """Decode frames using FFmpeg's internal threading."""
182187 decoder = VideoDecoder (video_path , num_ffmpeg_threads = num_threads , seek_mode = "approximate" )
183188 return decoder .get_frames_at (indices )
@@ -197,11 +202,8 @@ def decode_with_ffmpeg_parallelism(
197202#
198203# Process-based parallelism distributes work across multiple Python processes.
199204
200- def decode_with_multiprocessing (
201- indices : List [int ],
202- num_processes : int ,
203- video_path = long_video_path
204- ):
205+
206+ def decode_with_multiprocessing (indices : list [int ], num_processes : int , video_path = long_video_path ):
205207 """Decode frames using multiple processes with joblib."""
206208 chunks = split_indices (indices , num_chunks = num_processes )
207209
@@ -226,11 +228,8 @@ def decode_with_multiprocessing(
226228# Thread-based parallelism uses multiple threads within a single process.
227229# TorchCodec releases the GIL, so this can be very effective.
228230
229- def decode_with_multithreading (
230- indices : List [int ],
231- num_threads : int ,
232- video_path = long_video_path
233- ):
231+
232+ def decode_with_multithreading (indices : list [int ], num_threads : int , video_path = long_video_path ):
234233 """Decode frames using multiple threads with joblib."""
235234 chunks = split_indices (indices , num_chunks = num_threads )
236235
@@ -261,4 +260,5 @@ def decode_with_multithreading(
261260
262261# %%
263262import shutil
263+
264264shutil .rmtree (temp_dir )
0 commit comments