Skip to content

Commit d619ff1

Browse files
committed
Try 12
1 parent d287742 commit d619ff1

File tree

2 files changed

+44
-15
lines changed

2 files changed

+44
-15
lines changed

zstash/extract.py

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ def extractFiles( # noqa: C901
548548
raise
549549

550550

551-
# FIXME: C901 '_extractFiles_impl' is too complex (42)
551+
# FIXME: C901 '_extractFiles_impl' is too complex (51)
552552
def _extractFiles_impl( # noqa: C901
553553
files: List[FilesRow],
554554
keep_files: bool,
@@ -575,16 +575,12 @@ def _extractFiles_impl( # noqa: C901
575575
tfname: str
576576
newtar: bool = True
577577
nfiles: int = len(files)
578+
579+
# For multiprocess workers, we'll set up logging to queue later
580+
# Don't capture logs immediately to avoid blocking on initial setup
581+
setup_logging_later: bool = False
578582
if multiprocess_worker:
579-
# All messages to the logger will now be sent to
580-
# this queue, instead of sys.stdout.
581-
sh = logging.StreamHandler(multiprocess_worker.print_queue)
582-
sh.setLevel(logging.DEBUG)
583-
formatter: logging.Formatter = logging.Formatter("%(levelname)s: %(message)s")
584-
sh.setFormatter(formatter)
585-
logger.addHandler(sh)
586-
# Don't have the logger print to the console as the message come in.
587-
logger.propagate = False
583+
setup_logging_later = True
588584

589585
for i in range(nfiles):
590586
files_row: FilesRow = files[i]
@@ -593,11 +589,31 @@ def _extractFiles_impl( # noqa: C901
593589
if newtar:
594590
newtar = False
595591
tfname = os.path.join(cache, files_row.tar)
592+
593+
# Set up print queue synchronization NOW, after we know which tar we're processing
594+
if setup_logging_later and multiprocess_worker:
595+
# All messages to the logger will now be sent to
596+
# this queue, instead of sys.stdout.
597+
sh = logging.StreamHandler(multiprocess_worker.print_queue)
598+
sh.setLevel(logging.DEBUG)
599+
formatter: logging.Formatter = logging.Formatter(
600+
"%(levelname)s: %(message)s"
601+
)
602+
sh.setFormatter(formatter)
603+
logger.addHandler(sh)
604+
# Don't have the logger print to the console as the message come in.
605+
logger.propagate = False
606+
setup_logging_later = False
607+
596608
# Everytime we're extracting a new tar, if running in parallel,
597609
# let the process know.
598610
# This is to synchronize the print statements.
599611
if multiprocess_worker:
600-
multiprocess_worker.set_curr_tar(files_row.tar)
612+
try:
613+
multiprocess_worker.set_curr_tar(files_row.tar)
614+
except Exception:
615+
# If setting current tar fails, continue anyway
616+
pass
601617

602618
# Use args.hpss directly - it's always set correctly
603619
if args.hpss is not None:
@@ -751,7 +767,11 @@ def _extractFiles_impl( # noqa: C901
751767
failures.append(files_row)
752768

753769
if multiprocess_worker:
754-
multiprocess_worker.print_contents()
770+
try:
771+
multiprocess_worker.print_contents()
772+
except (TimeoutError, Exception):
773+
# If printing fails, continue - work is still done
774+
pass
755775

756776
# Close current archive?
757777
if i == nfiles - 1 or files[i].tar != files[i + 1].tar:
@@ -762,7 +782,12 @@ def _extractFiles_impl( # noqa: C901
762782
tar.close()
763783

764784
if multiprocess_worker:
765-
multiprocess_worker.done_enqueuing_output_for_tar(files_row.tar)
785+
try:
786+
multiprocess_worker.done_enqueuing_output_for_tar(files_row.tar)
787+
except TimeoutError:
788+
# If we timeout trying to signal completion, just continue
789+
# The monitor synchronization failed but the work is done
790+
pass
766791

767792
# Open new archive next time
768793
newtar = True
@@ -781,7 +806,11 @@ def _extractFiles_impl( # noqa: C901
781806

782807
if multiprocess_worker:
783808
# If there are things left to print, print them.
784-
multiprocess_worker.print_all_contents()
809+
try:
810+
multiprocess_worker.print_all_contents()
811+
except (TimeoutError, Exception):
812+
# If final printing fails, at least report failures
813+
pass
785814

786815
# Add the failures to the queue.
787816
# When running with multiprocessing, the function multiprocess_extract()

zstash/parallel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def wait_turn(
7373
with self._cv:
7474
attempted: bool = False
7575
workers_curr_tar_bytes = workers_curr_tar.encode("utf-8")
76-
max_wait_time = 120.0 # Maximum 2 minutes total wait
76+
max_wait_time = 10.0 # Maximum 10 seconds total wait - fail fast
7777
start_time = time.time()
7878

7979
while self._current_tar.value != workers_curr_tar_bytes:

0 commit comments

Comments
 (0)