Skip to content

Commit 6b682a6

Browse files
Fix directory progress
1 parent aece28f commit 6b682a6

File tree

2 files changed

+23
-13
lines changed

2 files changed

+23
-13
lines changed

bio2zarr/core.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ def wait_on_futures(futures):
5050
cancel_futures(futures)
5151
if isinstance(exception, cf.process.BrokenProcessPool):
5252
raise RuntimeError(
53-
"Worker process died: you may have run out of memory") from exception
53+
"Worker process died: you may have run out of memory"
54+
) from exception
5455
else:
5556
raise exception
5657

bio2zarr/vcf.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -890,6 +890,15 @@ def num_columns(self):
890890
return len(self.columns)
891891

892892

893+
894+
def mkdir_with_progress(path):
895+
logger.debug(f"mkdir f{path}")
896+
# NOTE we may have race-conditions here, I'm not sure. Hopefully allowing
897+
# parents=True will take care of it.
898+
path.mkdir(parents=True)
899+
core.update_progress(1)
900+
901+
893902
class IntermediateColumnarFormatWriter:
894903
def __init__(self, path):
895904
self.path = pathlib.Path(path)
@@ -932,7 +941,7 @@ def init(
932941
# dependencies as well.
933942
self.metadata.provenance = {"source": f"bio2zarr-{provenance.__version__}"}
934943

935-
self.mkdirs(worker_processes)
944+
self.mkdirs(worker_processes, show_progress=show_progress)
936945

937946
# Note: this is needed for the current version of the vcfzarr spec, but it's
938947
# probably going to be dropped.
@@ -947,30 +956,30 @@ def init(
947956
json.dump(self.metadata.asdict(), f, indent=4)
948957
return self.num_partitions
949958

950-
def mkdirs(self, worker_processes=1):
951-
logger.info(
952-
f"Creating {len(self.metadata.fields) * self.num_partitions} directories"
953-
)
959+
def mkdirs(self, worker_processes=1, show_progress=False):
960+
num_dirs = len(self.metadata.fields) * self.num_partitions
961+
logger.info(f"Creating {num_dirs} directories")
954962
self.path.mkdir()
955963
self.wip_path.mkdir()
956964
# Due to high latency batch system filesystems, we create all the directories in
957965
# parallel
958966
progress_config = core.ProgressConfig(
959-
total=len(self.metadata.fields) * self.num_partitions,
960-
units="dir",
961-
title="Creating directories",
962-
show=True
967+
total=num_dirs,
968+
units="dirs",
969+
title="Mkdirs",
970+
show=show_progress,
963971
)
964972
with core.ParallelWorkManager(
965-
worker_processes=worker_processes,
966-
progress_config=progress_config
973+
worker_processes=worker_processes, progress_config=progress_config
967974
) as manager:
968975
for field in self.metadata.fields:
969976
col_path = get_vcf_field_path(self.path, field)
977+
# Don't bother trying to count the intermediate directories towards
978+
# progress
970979
manager.submit(col_path.mkdir, parents=True)
971980
for j in range(self.num_partitions):
972981
part_path = col_path / f"p{j}"
973-
manager.submit(part_path.mkdir, parents=True)
982+
manager.submit(mkdir_with_progress, part_path)
974983

975984
def load_partition_summaries(self):
976985
summaries = []

0 commit comments

Comments
 (0)