2222DEFAULT_VIDEO_WORKERS = config_manager .get_int ('M3U8_DOWNLOAD' , 'default_video_workers' )
2323DEFAULT_AUDIO_WORKERS = config_manager .get_int ('M3U8_DOWNLOAD' , 'default_audio_workers' )
2424SEGMENT_MAX_TIMEOUT = config_manager .get_int ("M3U8_DOWNLOAD" , "segment_timeout" )
25+ LIMIT_SEGMENT = config_manager .get_int ('M3U8_DOWNLOAD' , 'limit_segment' )
2526
2627
2728class MPD_Segments :
@@ -38,7 +39,13 @@ def __init__(self, tmp_folder: str, representation: dict, pssh: str = None, limi
3839 self .tmp_folder = tmp_folder
3940 self .selected_representation = representation
4041 self .pssh = pssh
41- self .limit_segments = limit_segments
42+
43+ # Use LIMIT_SEGMENT from config if limit_segments is not specified or is 0
44+ if limit_segments is None or limit_segments == 0 :
45+ self .limit_segments = LIMIT_SEGMENT if LIMIT_SEGMENT > 0 else None
46+ else :
47+ self .limit_segments = limit_segments
48+
4249 self .download_interrupted = False
4350 self .info_nFailed = 0
4451
@@ -50,6 +57,10 @@ def __init__(self, tmp_folder: str, representation: dict, pssh: str = None, limi
5057 # Progress
5158 self ._last_progress_update = 0
5259 self ._progress_update_interval = 0.1
60+
61+ # Segment tracking
62+ self .segment_files = {}
63+ self .segments_lock = asyncio .Lock ()
5364
5465 def get_concat_path (self , output_dir : str = None ):
5566 """
@@ -78,7 +89,6 @@ def download_streams(self, output_dir: str = None, description: str = "DASH"):
7889 if self .limit_segments is not None :
7990 orig_count = len (self .selected_representation .get ('segment_urls' , []))
8091 if orig_count > self .limit_segments :
81-
8292 # Limit segment URLs
8393 self .selected_representation ['segment_urls' ] = self .selected_representation ['segment_urls' ][:self .limit_segments ]
8494 print (f"[yellow]Limiting segments from { orig_count } to { self .limit_segments } " )
@@ -113,6 +123,9 @@ async def download_segments(self, output_dir: str = None, concurrent_downloads:
113123
114124 os .makedirs (output_dir or self .tmp_folder , exist_ok = True )
115125 concat_path = os .path .join (output_dir or self .tmp_folder , f"{ rep_id } _encrypted.m4s" )
126+
127+ temp_dir = os .path .join (output_dir or self .tmp_folder , f"{ rep_id } _segments" )
128+ os .makedirs (temp_dir , exist_ok = True )
116129
117130 # Determine stream type (video/audio) for progress bar
118131 stream_type = description
@@ -132,7 +145,7 @@ async def download_segments(self, output_dir: str = None, concurrent_downloads:
132145 # Initialize estimator
133146 estimator = M3U8_Ts_Estimator (total_segments = len (segment_urls ) + 1 )
134147
135- results = [ None ] * len ( segment_urls )
148+ self . segment_files = {}
136149 self .downloaded_segments = set ()
137150 self .info_nFailed = 0
138151 self .download_interrupted = False
@@ -148,25 +161,25 @@ async def download_segments(self, output_dir: str = None, concurrent_downloads:
148161 # Download init segment
149162 await self ._download_init_segment (client , init_url , concat_path , estimator , progress_bar )
150163
151- # Download all segments (first batch)
164+ # Download all segments (first batch) - writes to temp files
152165 await self ._download_segments_batch (
153- client , segment_urls , results , semaphore , REQUEST_MAX_RETRY , estimator , progress_bar
166+ client , segment_urls , temp_dir , semaphore , REQUEST_MAX_RETRY , estimator , progress_bar
154167 )
155168
156169 # Retry failed segments
157170 await self ._retry_failed_segments (
158- client , segment_urls , results , semaphore , REQUEST_MAX_RETRY , estimator , progress_bar
171+ client , segment_urls , temp_dir , semaphore , REQUEST_MAX_RETRY , estimator , progress_bar
159172 )
160173
161- # Write all results to file
162- self ._write_results_to_file (concat_path , results )
174+ # Concatenate all segment files in order
175+ await self ._concatenate_segments (concat_path , len ( segment_urls ) )
163176
164177 except KeyboardInterrupt :
165178 self .download_interrupted = True
166179 print ("\n [red]Download interrupted by user (Ctrl+C)." )
167180
168181 finally :
169- self ._cleanup_resources (None , progress_bar )
182+ self ._cleanup_resources (temp_dir , progress_bar )
170183
171184 self ._verify_download_completion ()
172185 return self ._generate_results (stream_type )
@@ -187,12 +200,9 @@ async def _download_init_segment(self, client, init_url, concat_path, estimator,
187200 with open (concat_path , 'wb' ) as outfile :
188201 if response .status_code == 200 :
189202 outfile .write (response .content )
190- # Update estimator with init segment size
191203 estimator .add_ts_file (len (response .content ))
192204
193205 progress_bar .update (1 )
194-
195- # Update progress bar with estimated info
196206 self ._throttled_progress_update (len (response .content ), estimator , progress_bar )
197207
198208 except Exception as e :
@@ -208,24 +218,31 @@ def _throttled_progress_update(self, content_size: int, estimator, progress_bar)
208218 estimator .update_progress_bar (content_size , progress_bar )
209219 self ._last_progress_update = current_time
210220
211- async def _download_segments_batch (self , client , segment_urls , results , semaphore , max_retry , estimator , progress_bar ):
221+ async def _download_segments_batch (self , client , segment_urls , temp_dir , semaphore , max_retry , estimator , progress_bar ):
212222 """
213- Download a batch of segments and update results .
223+ Download a batch of segments and write them to temp files immediately .
214224 """
215225 async def download_single (url , idx ):
216226 async with semaphore :
217227 headers = {'User-Agent' : get_userAgent ()}
218228
219229 for attempt in range (max_retry ):
220230 if self .download_interrupted :
221- return idx , b'' , attempt
231+ return idx , False , attempt
222232
223233 try :
224234 timeout = min (SEGMENT_MAX_TIMEOUT , 10 + attempt * 3 )
225235 resp = await client .get (url , headers = headers , follow_redirects = True , timeout = timeout )
226236
237+ # Write to temp file immediately
227238 if resp .status_code == 200 :
228- return idx , resp .content , attempt
239+ temp_file = os .path .join (temp_dir , f"seg_{ idx :06d} .tmp" )
240+ async with self .segments_lock :
241+ with open (temp_file , 'wb' ) as f :
242+ f .write (resp .content )
243+ self .segment_files [idx ] = temp_file
244+
245+ return idx , True , attempt , len (resp .content )
229246 else :
230247 if attempt < 2 :
231248 sleep_time = 0.5 + attempt * 0.5
@@ -237,17 +254,16 @@ async def download_single(url, idx):
237254 sleep_time = min (2.0 , 1.1 * (2 ** attempt ))
238255 await asyncio .sleep (sleep_time )
239256
240- return idx , b'' , max_retry
257+ return idx , False , max_retry , 0
241258
242259 # Initial download attempt
243260 tasks = [download_single (url , i ) for i , url in enumerate (segment_urls )]
244261
245262 for coro in asyncio .as_completed (tasks ):
246263 try :
247- idx , data , nretry = await coro
248- results [idx ] = data
264+ idx , success , nretry , size = await coro
249265
250- if data and len ( data ) > 0 :
266+ if success :
251267 self .downloaded_segments .add (idx )
252268 else :
253269 self .info_nFailed += 1
@@ -257,27 +273,23 @@ async def download_single(url, idx):
257273 self .info_nRetry += nretry
258274
259275 progress_bar .update (1 )
260-
261- # Update estimator with segment size
262- estimator .add_ts_file (len (data ))
263-
264- # Update progress bar with estimated info and segment count
265- self ._throttled_progress_update (len (data ), estimator , progress_bar )
276+ estimator .add_ts_file (size )
277+ self ._throttled_progress_update (size , estimator , progress_bar )
266278
267279 except KeyboardInterrupt :
268280 self .download_interrupted = True
269281 print ("\n [red]Download interrupted by user (Ctrl+C)." )
270282 break
271283
272- async def _retry_failed_segments (self , client , segment_urls , results , semaphore , max_retry , estimator , progress_bar ):
284+ async def _retry_failed_segments (self , client , segment_urls , temp_dir , semaphore , max_retry , estimator , progress_bar ):
273285 """
274286 Retry failed segments up to 3 times.
275287 """
276288 max_global_retries = 3
277289 global_retry_count = 0
278290
279291 while self .info_nFailed > 0 and global_retry_count < max_global_retries and not self .download_interrupted :
280- failed_indices = [i for i , data in enumerate ( results ) if not data or len ( data ) == 0 ]
292+ failed_indices = [i for i in range ( len ( segment_urls )) if i not in self . downloaded_segments ]
281293 if not failed_indices :
282294 break
283295
@@ -289,32 +301,37 @@ async def download_single(url, idx):
289301
290302 for attempt in range (max_retry ):
291303 if self .download_interrupted :
292- return idx , b'' , attempt
304+ return idx , False , attempt , 0
293305
294306 try :
295307 timeout = min (SEGMENT_MAX_TIMEOUT , 15 + attempt * 5 )
296308 resp = await client .get (url , headers = headers , timeout = timeout )
297309
310+ # Write to temp file immediately
298311 if resp .status_code == 200 :
299- return idx , resp .content , attempt
312+ temp_file = os .path .join (temp_dir , f"seg_{ idx :06d} .tmp" )
313+ async with self .segments_lock :
314+ with open (temp_file , 'wb' ) as f :
315+ f .write (resp .content )
316+ self .segment_files [idx ] = temp_file
317+
318+ return idx , True , attempt , len (resp .content )
300319 else :
301320 await asyncio .sleep (1.5 * (2 ** attempt ))
302321
303322 except Exception :
304323 await asyncio .sleep (1.5 * (2 ** attempt ))
305324
306- return idx , b'' , max_retry
325+ return idx , False , max_retry , 0
307326
308327 retry_tasks = [download_single (segment_urls [i ], i ) for i in failed_indices ]
309328
310- # Reset nFailed for this round
311329 nFailed_this_round = 0
312330 for coro in asyncio .as_completed (retry_tasks ):
313331 try :
314- idx , data , nretry = await coro
332+ idx , success , nretry , size = await coro
315333
316- if data and len (data ) > 0 :
317- results [idx ] = data
334+ if success :
318335 self .downloaded_segments .add (idx )
319336 else :
320337 nFailed_this_round += 1
@@ -323,9 +340,9 @@ async def download_single(url, idx):
323340 self .info_maxRetry = nretry
324341 self .info_nRetry += nretry
325342
326- progress_bar .update (0 ) # No progress bar increment, already counted
327- estimator .add_ts_file (len ( data ) )
328- self ._throttled_progress_update (len ( data ) , estimator , progress_bar )
343+ progress_bar .update (0 )
344+ estimator .add_ts_file (size )
345+ self ._throttled_progress_update (size , estimator , progress_bar )
329346
330347 except KeyboardInterrupt :
331348 self .download_interrupted = True
@@ -335,21 +352,24 @@ async def download_single(url, idx):
335352 self .info_nFailed = nFailed_this_round
336353 global_retry_count += 1
337354
338- def _write_results_to_file (self , concat_path , results ):
355+ async def _concatenate_segments (self , concat_path , total_segments ):
339356 """
340- Write all downloaded segments to the output file.
357+ Concatenate all segment files in order to the final output file.
341358 """
342359 with open (concat_path , 'ab' ) as outfile :
343- for data in results :
344- if data :
345- outfile .write (data )
360+ for idx in range (total_segments ):
361+ if idx in self .segment_files :
362+ temp_file = self .segment_files [idx ]
363+ if os .path .exists (temp_file ):
364+ with open (temp_file , 'rb' ) as infile :
365+ outfile .write (infile .read ())
346366
347367 def _get_bar_format (self , description : str ) -> str :
348368 """
349369 Generate platform-appropriate progress bar format.
350370 """
351371 return (
352- f"{ Colors .YELLOW } [ DASH] { Colors .CYAN } { description } { Colors .WHITE } : "
372+ f"{ Colors .YELLOW } - DASH{ Colors .CYAN } { description } { Colors .WHITE } : "
353373 f"{ Colors .MAGENTA } {{bar:40}} "
354374 f"{ Colors .LIGHT_GREEN } {{n_fmt}}{ Colors .WHITE } /{ Colors .CYAN } {{total_fmt}} { Colors .LIGHT_MAGENTA } TS { Colors .WHITE } "
355375 f"{ Colors .DARK_GRAY } [{ Colors .YELLOW } {{elapsed}}{ Colors .WHITE } < { Colors .CYAN } {{remaining}}{ Colors .DARK_GRAY } ] "
@@ -383,7 +403,6 @@ def _verify_download_completion(self) -> None:
383403 total = len (self .selected_representation ['segment_urls' ])
384404 completed = getattr (self , 'downloaded_segments' , set ())
385405
386- # If interrupted, skip raising error
387406 if self .download_interrupted :
388407 return
389408
@@ -394,17 +413,28 @@ def _verify_download_completion(self) -> None:
394413 missing = sorted (set (range (total )) - completed )
395414 raise RuntimeError (f"Download incomplete ({ len (completed )/ total :.1%} ). Missing segments: { missing } " )
396415
397- def _cleanup_resources (self , writer_thread , progress_bar : tqdm ) -> None :
416+ def _cleanup_resources (self , temp_dir , progress_bar : tqdm ) -> None :
398417 """
399418 Ensure resource cleanup and final reporting.
400419 """
401420 progress_bar .close ()
421+
422+ # Delete temp segment files
423+ if temp_dir and os .path .exists (temp_dir ):
424+ try :
425+ for temp_file in self .segment_files .values ():
426+ if os .path .exists (temp_file ):
427+ os .remove (temp_file )
428+ os .rmdir (temp_dir )
429+
430+ except Exception as e :
431+ print (f"[yellow]Warning: Could not clean temp directory: { e } " )
432+
402433 if getattr (self , 'info_nFailed' , 0 ) > 0 :
403434 self ._display_error_summary ()
404435
405436 # Clear memory
406- self .buffer = {}
407- self .expected_index = 0
437+ self .segment_files = {}
408438
409439 def _display_error_summary (self ) -> None :
410440 """
0 commit comments