Skip to content

Commit b62bb60

Browse files
Merge pull request #108 from benjeffery/parallel_dir
Parallel directory creation
2 parents c7b43ed + 6028ce9 commit b62bb60

File tree

1 file changed

+21
-11
lines changed

1 file changed

+21
-11
lines changed

bio2zarr/vcf.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -932,10 +932,10 @@ def init(
932932
# dependencies as well.
933933
self.metadata.provenance = {"source": f"bio2zarr-{provenance.__version__}"}
934934

935-
self.mkdirs()
935+
self.mkdirs(worker_processes)
936936

937937
# Note: this is needed for the current version of the vcfzarr spec, but it's
938-
# probably goint to be dropped.
938+
# probably going to be dropped.
939939
# https://github.com/pystatgen/vcf-zarr-spec/issues/15
940940
# May be useful to keep lying around still though?
941941
logger.info(f"Writing VCF header")
@@ -947,20 +947,30 @@ def init(
947947
json.dump(self.metadata.asdict(), f, indent=4)
948948
return self.num_partitions
949949

950-
def mkdirs(self):
951-
# TODO add worker_processes here and do this with the ParallelWorkManager
950+
def mkdirs(self, worker_processes=1):
952951
logger.info(
953952
f"Creating {len(self.metadata.fields) * self.num_partitions} directories"
954953
)
955954
self.path.mkdir()
956955
self.wip_path.mkdir()
957-
for field in self.metadata.fields:
958-
col_path = get_vcf_field_path(self.path, field)
959-
logger.debug(f"Make directories for {field.full_name} at {col_path}")
960-
col_path.mkdir(parents=True)
961-
for j in range(self.num_partitions):
962-
part_path = col_path / f"p{j}"
963-
part_path.mkdir()
956+
# Due to high latency batch system filesystems, we create all the directories in
957+
# parallel
958+
progress_config = core.ProgressConfig(
959+
total=len(self.metadata.fields) * self.num_partitions,
960+
units="dir",
961+
title="Creating directories",
962+
show=True
963+
)
964+
with core.ParallelWorkManager(
965+
worker_processes=worker_processes,
966+
progress_config=progress_config
967+
) as manager:
968+
for field in self.metadata.fields:
969+
col_path = get_vcf_field_path(self.path, field)
970+
manager.submit(col_path.mkdir, parents=True)
971+
for j in range(self.num_partitions):
972+
part_path = col_path / f"p{j}"
973+
manager.submit(part_path.mkdir, parents=True)
964974

965975
def load_partition_summaries(self):
966976
summaries = []

0 commit comments

Comments
 (0)