Skip to content

Commit 391881e

Browse files
Move mkdirs into per-partition processing
Closes #114
1 parent 0e55447 commit 391881e

File tree

2 files changed

+11
-31
lines changed

2 files changed

+11
-31
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# 0.0.5 2024-04-XX
22

33
- Fix bug in schema handling (compressor settings ignored)
4+
- Move making ICF field partition directories into per-partition processing.
5+
Remove progress on the init mkdirs step.
46

57
# 0.0.4 2024-04-08
68

bio2zarr/vcf.py

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,8 @@ def __init__(
791791
for vcf_field in icf_metadata.fields:
792792
field_path = get_vcf_field_path(out_path, vcf_field)
793793
field_partition_path = field_path / f"p{partition_index}"
794+
# Should be robust to running explode_partition twice.
795+
field_partition_path.mkdir(exist_ok=True)
794796
transformer = VcfValueTransformer.factory(vcf_field, num_samples)
795797
self.field_writers[vcf_field.full_name] = IcfFieldWriter(
796798
vcf_field,
@@ -890,14 +892,6 @@ def num_columns(self):
890892
return len(self.columns)
891893

892894

893-
def mkdir_with_progress(path):
894-
logger.debug(f"mkdir f{path}")
895-
# NOTE we may have race-conditions here, I'm not sure. Hopefully allowing
896-
# parents=True will take care of it.
897-
path.mkdir(parents=True)
898-
core.update_progress(1)
899-
900-
901895
class IntermediateColumnarFormatWriter:
902896
def __init__(self, path):
903897
self.path = pathlib.Path(path)
@@ -940,7 +934,7 @@ def init(
940934
# dependencies as well.
941935
self.metadata.provenance = {"source": f"bio2zarr-{provenance.__version__}"}
942936

943-
self.mkdirs(worker_processes, show_progress=show_progress)
937+
self.mkdirs()
944938

945939
# Note: this is needed for the current version of the vcfzarr spec, but it's
946940
# probably going to be dropped.
@@ -955,30 +949,14 @@ def init(
955949
json.dump(self.metadata.asdict(), f, indent=4)
956950
return self.num_partitions
957951

958-
def mkdirs(self, worker_processes=1, show_progress=False):
959-
num_dirs = len(self.metadata.fields) * self.num_partitions
960-
logger.info(f"Creating {num_dirs} directories")
952+
def mkdirs(self):
953+
num_dirs = len(self.metadata.fields)
954+
logger.info(f"Creating {num_dirs} field directories")
961955
self.path.mkdir()
962956
self.wip_path.mkdir()
963-
# Due to high latency batch system filesystems, we create all the directories in
964-
# parallel
965-
progress_config = core.ProgressConfig(
966-
total=num_dirs,
967-
units="dirs",
968-
title="Mkdirs",
969-
show=show_progress,
970-
)
971-
with core.ParallelWorkManager(
972-
worker_processes=worker_processes, progress_config=progress_config
973-
) as manager:
974-
for field in self.metadata.fields:
975-
col_path = get_vcf_field_path(self.path, field)
976-
# Don't bother trying to count the intermediate directories towards
977-
# progress
978-
manager.submit(col_path.mkdir, parents=True)
979-
for j in range(self.num_partitions):
980-
part_path = col_path / f"p{j}"
981-
manager.submit(mkdir_with_progress, part_path)
957+
for field in self.metadata.fields:
958+
col_path = get_vcf_field_path(self.path, field)
959+
col_path.mkdir(parents=True)
982960

983961
def load_partition_summaries(self):
984962
summaries = []

0 commit comments

Comments
 (0)