33import argparse
44import collections
55import hashlib
6- import heapq
76import logging
87import multiprocessing
98import os .path
109import re
1110import sqlite3
1211import sys
1312import tarfile
13+ import time
1414import traceback
1515from datetime import datetime
1616from typing import DefaultDict , List , Optional , Set , Tuple
@@ -282,10 +282,10 @@ def extract_database(
282282 if args .workers > 1 :
283283 logger .debug ("Running zstash {} with multiprocessing" .format (cmd ))
284284 failures = multiprocess_extract (
285- args .workers , matches , keep_files , keep , cache , cur , args
285+ args .workers , matches , keep_files , keep , cache , args
286286 )
287287 else :
288- failures = extractFiles (matches , keep_files , keep , cache , cur , args )
288+ failures = extractFiles (matches , keep_files , keep , cache , args , None , cur )
289289
290290 # Close database
291291 logger .debug ("Closing index database" )
@@ -300,7 +300,6 @@ def multiprocess_extract(
300300 keep_files : bool ,
301301 keep_tars : Optional [bool ],
302302 cache : str ,
303- cur : sqlite3 .Cursor ,
304303 args : argparse .Namespace ,
305304) -> List [FilesRow ]:
306305 """
@@ -329,26 +328,12 @@ def multiprocess_extract(
329328 # set the number of workers to the number of tars.
330329 num_workers = min (num_workers , len (tar_to_size ))
331330
332- # For worker i, workers_to_tars[i] is a set of tars
333- # that worker i will work on.
331+ # For worker i, workers_to_tars[i] is a set of tars that worker i will work on.
332+ # Assign tars in round-robin fashion to maintain proper ordering
334333 workers_to_tars : List [set ] = [set () for _ in range (num_workers )]
335- # A min heap, of (work, worker_idx) tuples, work is the size of data
336- # that worker_idx needs to work on.
337- # We can efficiently get the worker with the least amount of work.
338- work_to_workers : List [Tuple [int , int ]] = [(0 , i ) for i in range (num_workers )]
339- heapq .heapify (workers_to_tars )
340-
341- # Using a greedy approach, populate workers_to_tars.
342- for _ , tar in enumerate (tar_to_size ):
343- # The worker with the least work should get the current largest amount of work.
344- workers_work : int
345- worker_idx : int
346- workers_work , worker_idx = heapq .heappop (work_to_workers )
334+ for idx , tar in enumerate (sorted (tar_to_size .keys ())):
335+ worker_idx = idx % num_workers
347336 workers_to_tars [worker_idx ].add (tar )
348- # Add this worker back to the heap, with the new amount of work.
349- worker_tuple : Tuple [float , int ] = (workers_work + tar_to_size [tar ], worker_idx )
350- # FIXME: error: Cannot infer type argument 1 of "heappush"
351- heapq .heappush (work_to_workers , worker_tuple ) # type: ignore
352337
353338 # For worker i, workers_to_matches[i] is a list of
354339 # matches from the database for it to process.
@@ -361,8 +346,15 @@ def multiprocess_extract(
361346 # This worker gets this db_row.
362347 workers_to_matches [workers_idx ].append (db_row )
363348
349+ # Sort each worker's matches by tar to ensure they process in order
350+ for worker_matches in workers_to_matches :
351+ worker_matches .sort (key = lambda t : t .tar )
352+
364353 tar_ordering : List [str ] = sorted ([tar for tar in tar_to_size ])
365- monitor : parallel .PrintMonitor = parallel .PrintMonitor (tar_ordering )
354+ manager = multiprocessing .Manager ()
355+ monitor : parallel .PrintMonitor = parallel .PrintMonitor (
356+ tar_ordering , manager = manager
357+ )
366358
367359 # The return value for extractFiles will be added here.
368360 failure_queue : multiprocessing .Queue [FilesRow ] = multiprocessing .Queue ()
@@ -374,7 +366,7 @@ def multiprocess_extract(
374366 )
375367 process : multiprocessing .Process = multiprocessing .Process (
376368 target = extractFiles ,
377- args = (matches , keep_files , keep_tars , cache , cur , args , worker ),
369+ args = (matches , keep_files , keep_tars , cache , args , worker ),
378370 daemon = True ,
379371 )
380372 process .start ()
@@ -385,10 +377,39 @@ def multiprocess_extract(
385377 # No need to join() each of the processes when doing this,
386378 # because we'll be in this loop until completion.
387379 failures : List [FilesRow ] = []
380+ max_wait_time = 180 # 3 minute timeout for tests
381+ start_time = time .time ()
382+ last_log_time = start_time
383+
388384 while any (p .is_alive () for p in processes ):
385+ elapsed = time .time () - start_time
386+ if elapsed > max_wait_time :
387+ logger .error (
388+ f"Timeout after { elapsed :.1f} s waiting for worker processes. Terminating..."
389+ )
390+ for p in processes :
391+ if p .is_alive ():
392+ logger .error (f"Terminating process { p .pid } " )
393+ p .terminate ()
394+ break
395+
396+ # Log progress every 30 seconds
397+ if time .time () - last_log_time > 30 :
398+ alive_count = sum (1 for p in processes if p .is_alive ())
399+ logger .debug (
400+ f"Still waiting for { alive_count } worker processes after { elapsed :.1f} s"
401+ )
402+ last_log_time = time .time ()
403+
389404 while not failure_queue .empty ():
390405 failures .append (failure_queue .get ())
391406
407+ time .sleep (0.1 ) # Larger sleep to reduce CPU usage
408+
409+ # Collect any remaining failures
410+ while not failure_queue .empty ():
411+ failures .append (failure_queue .get ())
412+
392413 # Sort the failures, since they can come in at any order.
393414 failures .sort (key = lambda t : (t .name , t .tar , t .offset ))
394415 return failures
@@ -479,9 +500,9 @@ def extractFiles( # noqa: C901
479500 keep_files : bool ,
480501 keep_tars : Optional [bool ],
481502 cache : str ,
482- cur : sqlite3 .Cursor ,
483503 args : argparse .Namespace ,
484504 multiprocess_worker : Optional [parallel .ExtractWorker ] = None ,
505+ cur : Optional [sqlite3 .Cursor ] = None ,
485506) -> List [FilesRow ]:
486507 """
487508 Given a list of database rows, extract the files from the
@@ -498,21 +519,59 @@ def extractFiles( # noqa: C901
498519 that called this function.
499520 We need a reference to it so we can signal it to print
500521 the contents of what's in its print queue.
522+
523+ If cur is None (when running in parallel), a new database connection
524+ will be opened for this worker process.
501525 """
526+ try :
527+ result = _extractFiles_impl (
528+ files , keep_files , keep_tars , cache , args , multiprocess_worker , cur
529+ )
530+ return result
531+ except Exception as e :
532+ if multiprocess_worker :
533+ # Make sure we report failures even if there's an exception
534+ sys .stderr .write (f"ERROR: Worker encountered fatal error: { e } \n " )
535+ sys .stderr .flush ()
536+ traceback .print_exc (file = sys .stderr )
537+ for f in files :
538+ multiprocess_worker .failure_queue .put (f )
539+ raise
540+
541+
542+ # FIXME: C901 '_extractFiles_impl' is too complex (42)
543+ def _extractFiles_impl ( # noqa: C901
544+ files : List [FilesRow ],
545+ keep_files : bool ,
546+ keep_tars : Optional [bool ],
547+ cache : str ,
548+ args : argparse .Namespace ,
549+ multiprocess_worker : Optional [parallel .ExtractWorker ] = None ,
550+ cur : Optional [sqlite3 .Cursor ] = None ,
551+ ) -> List [FilesRow ]:
552+ """
553+ Implementation of extractFiles - actual extraction logic.
554+ """
555+ # Open database connection if not provided (parallel case)
556+ if cur is None :
557+ con : sqlite3 .Connection = sqlite3 .connect (
558+ get_db_filename (cache ), detect_types = sqlite3 .PARSE_DECLTYPES
559+ )
560+ cur = con .cursor ()
561+ close_db : bool = True
562+ else :
563+ close_db = False
564+
502565 failures : List [FilesRow ] = []
503566 tfname : str
504567 newtar : bool = True
505568 nfiles : int = len (files )
569+
570+ # For multiprocess workers, we'll set up logging to queue later
571+ # Don't capture logs immediately to avoid blocking on initial setup
572+ setup_logging_later : bool = False
506573 if multiprocess_worker :
507- # All messages to the logger will now be sent to
508- # this queue, instead of sys.stdout.
509- sh = logging .StreamHandler (multiprocess_worker .print_queue )
510- sh .setLevel (logging .DEBUG )
511- formatter : logging .Formatter = logging .Formatter ("%(levelname)s: %(message)s" )
512- sh .setFormatter (formatter )
513- logger .addHandler (sh )
514- # Don't have the logger print to the console as the message come in.
515- logger .propagate = False
574+ setup_logging_later = True
516575
517576 for i in range (nfiles ):
518577 files_row : FilesRow = files [i ]
@@ -521,16 +580,32 @@ def extractFiles( # noqa: C901
521580 if newtar :
522581 newtar = False
523582 tfname = os .path .join (cache , files_row .tar )
524- # Everytime we're extracting a new tar, if running in parallel,
525- # let the process know.
526- # This is to synchronize the print statements.
527- if multiprocess_worker :
583+
584+ # Set up print queue synchronization NOW, after we know which tar we're processing
585+ if setup_logging_later and multiprocess_worker :
586+ # Set the current tar FIRST, before redirecting logging
528587 multiprocess_worker .set_curr_tar (files_row .tar )
529588
530- if config .hpss is not None :
531- hpss : str = config .hpss
589+ # NOW redirect all logger messages to the print queue
590+ sh = logging .StreamHandler (multiprocess_worker .print_queue )
591+ sh .setLevel (logging .DEBUG )
592+ formatter : logging .Formatter = logging .Formatter (
593+ "%(levelname)s: %(message)s"
594+ )
595+ sh .setFormatter (formatter )
596+ logger .addHandler (sh )
597+ logger .propagate = False
598+ setup_logging_later = False
599+ elif multiprocess_worker :
600+ # Logging already set up, just update which tar we're working on
601+ multiprocess_worker .set_curr_tar (files_row .tar )
602+
603+ # Use args.hpss directly - it's always set correctly
604+ if args .hpss is not None :
605+ hpss : str = args .hpss
532606 else :
533- raise TypeError ("Invalid config.hpss={}" .format (config .hpss ))
607+ raise TypeError ("Invalid args.hpss={}" .format (args .hpss ))
608+
534609 tries : int = args .retries + 1
535610 # Set to True to test the `--retries` option with a forced failure.
536611 # Then run `python -m unittest tests.test_extract.TestExtract.testExtractRetries`
@@ -677,7 +752,11 @@ def extractFiles( # noqa: C901
677752 failures .append (files_row )
678753
679754 if multiprocess_worker :
680- multiprocess_worker .print_contents ()
755+ try :
756+ multiprocess_worker .print_contents ()
757+ except (TimeoutError , Exception ):
758+ # If printing fails, continue - work is still done
759+ pass
681760
682761 # Close current archive?
683762 if i == nfiles - 1 or files [i ].tar != files [i + 1 ].tar :
@@ -688,7 +767,38 @@ def extractFiles( # noqa: C901
688767 tar .close ()
689768
690769 if multiprocess_worker :
691- multiprocess_worker .done_enqueuing_output_for_tar (files_row .tar )
770+ try :
771+ sys .stderr .write (
772+ f"DEBUG: Worker calling done_enqueuing for { files_row .tar } \n "
773+ )
774+ sys .stderr .flush ()
775+ multiprocess_worker .done_enqueuing_output_for_tar (files_row .tar )
776+ sys .stderr .write (
777+ f"DEBUG: Worker finished done_enqueuing for { files_row .tar } \n "
778+ )
779+ sys .stderr .flush ()
780+ except TimeoutError as e :
781+ sys .stderr .write (f"DEBUG: Timeout in done_enqueuing: { e } \n " )
782+ sys .stderr .flush ()
783+ pass
784+
785+ # Print the output for this tar now
786+ try :
787+ sys .stderr .write (
788+ f"DEBUG: Worker calling print_all_contents for { files_row .tar } \n "
789+ )
790+ sys .stderr .flush ()
791+ multiprocess_worker .print_all_contents ()
792+ sys .stderr .write (
793+ f"DEBUG: Worker finished print_all_contents for { files_row .tar } \n "
794+ )
795+ sys .stderr .flush ()
796+ except (TimeoutError , Exception ) as e :
797+ sys .stderr .write (
798+ f"DEBUG: Exception in print_all_contents for { files_row .tar } : { e } \n "
799+ )
800+ sys .stderr .flush ()
801+ pass
692802
693803 # Open new archive next time
694804 newtar = True
@@ -700,13 +810,14 @@ def extractFiles( # noqa: C901
700810 else :
701811 raise TypeError ("Invalid tfname={}" .format (tfname ))
702812
703- if multiprocess_worker :
704- # If there are things left to print, print them.
705- multiprocess_worker .print_all_contents ()
813+ # Close database connection if we opened it
814+ if close_db :
815+ cur .close ()
816+ con .close ()
706817
707- # Add the failures to the queue.
708- # When running with multiprocessing, the function multiprocess_extract()
709- # that calls this extractFiles() function will return the failures as a list.
818+ if multiprocess_worker :
819+ # All printing should have happened after each tar
820+ # Just add failures to the queue
710821 for f in failures :
711822 multiprocess_worker .failure_queue .put (f )
712823 return failures
0 commit comments