Skip to content

Commit ed48856

Browse files
committed
Fix progress bars and missing partition errors
1 parent fdaaefb commit ed48856

File tree

2 files changed

+22
-7
lines changed

2 files changed

+22
-7
lines changed

bio2zarr/cli.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,18 @@ def explode(vcfs, out_path, verbose, worker_processes, column_chunk_size):
8181
@click.command
8282
@click.argument("vcfs", nargs=-1, required=True)
8383
@click.argument("out_path", type=click.Path())
84+
@click.argument("num_partitions", type=int, required=True)
8485
@verbose
8586
@worker_processes
86-
def explode_init(vcfs, out_path, verbose, worker_processes):
87+
def explode_init(vcfs, out_path, num_partitions, verbose, worker_processes):
8788
"""
8889
Initial step for parallel conversion of VCF(s) to columnar intermediate format
8990
"""
9091
setup_logging(verbose)
9192
vcf.explode_init(
9293
vcfs,
9394
out_path,
95+
num_partitions=num_partitions,
9496
worker_processes=worker_processes,
9597
show_progress=True,
9698
)

bio2zarr/vcf.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -896,12 +896,20 @@ def write_header(self):
896896

897897
def load_partition_summaries(self):
898898
summaries = []
899+
not_found = []
899900
for j in range(self.num_partitions):
900-
with open(self.path / f"partition_{j}_metadata.json") as f:
901-
summary = json.load(f)
902-
for k, v in summary['field_summaries'].items():
903-
summary['field_summaries'][k] = VcfFieldSummary(**v)
904-
summaries.append(summary)
901+
try:
902+
with open(self.path / f"partition_{j}_metadata.json") as f:
903+
summary = json.load(f)
904+
for k, v in summary['field_summaries'].items():
905+
summary['field_summaries'][k] = VcfFieldSummary(**v)
906+
summaries.append(summary)
907+
except FileNotFoundError:
908+
not_found.append(j)
909+
if not_found:
910+
raise FileNotFoundError(
911+
f"Partition metadata not found for {len(not_found)} partitions: {not_found}"
912+
)
905913
return summaries
906914

907915

@@ -1008,8 +1016,13 @@ def convert_slice(self, start, stop, *, worker_processes=1, show_progress=False,
10081016
f"Exploding {self.num_columns} columns {self.metadata.num_records} variants "
10091017
f"{self.num_samples} samples"
10101018
)
1019+
if start < 0:
1020+
raise ValueError(f"start={start} must be non-negative")
1021+
if stop > self.num_partitions:
1022+
raise ValueError(f"stop={stop} must be less than the number of partitions")
1023+
num_records_to_progress = sum([partition.num_records for partition in self.metadata.partitions[start:stop]])
10111024
progress_config = core.ProgressConfig(
1012-
total=self.metadata.num_records,
1025+
total=num_records_to_progress,
10131026
units="vars",
10141027
title="Explode",
10151028
show=show_progress,

0 commit comments

Comments
 (0)