Skip to content

Commit 2b0d1a4

Browse files
Make dexplode-partition syncronous
Closes #149 Fixup tests for refined VCF partitioning Simplify some codepaths, fix tests
1 parent e23e5dc commit 2b0d1a4

File tree

3 files changed

+21
-49
lines changed

3 files changed

+21
-49
lines changed

bio2zarr/cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ def dexplode_partition(icf_path, partition, verbose):
233233
from 0 (inclusive) to the number of paritions returned by dexplode_init (exclusive).
234234
"""
235235
setup_logging(verbose)
236-
vcf.explode_partition(icf_path, partition, show_progress=False)
236+
vcf.explode_partition(icf_path, partition)
237237

238238

239239
@click.command

bio2zarr/vcf.py

Lines changed: 16 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -325,12 +325,15 @@ def scan_vcfs(paths, show_progress, target_num_partitions, worker_processes=1):
325325
all_partitions.append(partition)
326326
total_records += metadata.num_records
327327
metadata.num_records = 0
328+
metadata.partitions = []
328329

329330
icf_metadata, header = results[0]
330331
for metadata, _ in results[1:]:
331332
if metadata != icf_metadata:
332333
raise ValueError("Incompatible VCF chunks")
333334

335+
# Note: this will be infinity here if any of the chunks has an index
336+
# that doesn't keep track of the number of records per-contig
334337
icf_metadata.num_records = total_records
335338

336339
# Sort by contig (in the order they appear in the header) first,
@@ -1057,32 +1060,20 @@ def process_partition(self, partition_index):
10571060
f"{num_records} records"
10581061
)
10591062

1060-
def process_partition_slice(
1061-
self,
1062-
start,
1063-
stop,
1064-
*,
1065-
worker_processes=1,
1066-
show_progress=False,
1067-
):
1063+
def explode(self, *, worker_processes=1, show_progress=False):
10681064
self.load_metadata()
1069-
if start == 0 and stop == self.num_partitions:
1070-
num_records = self.metadata.num_records
1071-
if np.isinf(num_records):
1072-
logger.warning(
1073-
"Total records unknown, cannot show progress; "
1074-
"reindex VCFs with bcftools index to fix"
1075-
)
1076-
num_records = None
1077-
else:
1078-
# We only know the number of records if all partitions are done at once,
1079-
# and we signal this to tqdm by passing None as the total.
1065+
num_records = self.metadata.num_records
1066+
if np.isinf(num_records):
1067+
logger.warning(
1068+
"Total records unknown, cannot show progress; "
1069+
"reindex VCFs with bcftools index to fix"
1070+
)
10801071
num_records = None
10811072
num_columns = len(self.metadata.fields)
10821073
num_samples = len(self.metadata.samples)
10831074
logger.info(
10841075
f"Exploding columns={num_columns} samples={num_samples}; "
1085-
f"partitions={stop - start} "
1076+
f"partitions={self.num_partitions} "
10861077
f"variants={'unknown' if num_records is None else num_records}"
10871078
)
10881079
progress_config = core.ProgressConfig(
@@ -1092,30 +1083,16 @@ def process_partition_slice(
10921083
show=show_progress,
10931084
)
10941085
with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
1095-
for j in range(start, stop):
1086+
for j in range(self.num_partitions):
10961087
pwm.submit(self.process_partition, j)
10971088

1098-
def explode(self, *, worker_processes=1, show_progress=False):
1099-
self.load_metadata()
1100-
return self.process_partition_slice(
1101-
0,
1102-
self.num_partitions,
1103-
worker_processes=worker_processes,
1104-
show_progress=show_progress,
1105-
)
1106-
1107-
def explode_partition(self, partition, *, show_progress=False, worker_processes=1):
1089+
def explode_partition(self, partition):
11081090
self.load_metadata()
11091091
if partition < 0 or partition >= self.num_partitions:
11101092
raise ValueError(
11111093
"Partition index must be in the range 0 <= index < num_partitions"
11121094
)
1113-
return self.process_partition_slice(
1114-
partition,
1115-
partition + 1,
1116-
worker_processes=worker_processes,
1117-
show_progress=show_progress,
1118-
)
1095+
self.process_partition(partition)
11191096

11201097
def finalise(self):
11211098
self.load_metadata()
@@ -1190,14 +1167,9 @@ def explode_init(
11901167
)
11911168

11921169

1193-
# NOTE only including worker_processes here so we can use the 0 option to get the
1194-
# work done syncronously and so we can get test coverage on it. Should find a
1195-
# better way to do this.
1196-
def explode_partition(icf_path, partition, *, show_progress=False, worker_processes=1):
1170+
def explode_partition(icf_path, partition):
11971171
writer = IntermediateColumnarFormatWriter(icf_path)
1198-
writer.explode_partition(
1199-
partition, show_progress=show_progress, worker_processes=worker_processes
1200-
)
1172+
writer.explode_partition(partition)
12011173

12021174

12031175
def explode_finalise(icf_path):

tests/test_cli.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
show_progress=True,
1515
)
1616

17-
DEFAULT_DEXPLODE_PARTITION_ARGS = dict(show_progress=False)
17+
DEFAULT_DEXPLODE_PARTITION_ARGS = dict()
1818

1919
DEFAULT_DEXPLODE_INIT_ARGS = dict(
2020
worker_processes=1,
@@ -614,10 +614,10 @@ def test_num_parts(self):
614614
cli.vcf_partition, [path, "-n", "5"], catch_exceptions=False
615615
)
616616
assert list(result.stdout.splitlines()) == [
617-
"20:1-278528",
617+
"20:60001-278528",
618618
"20:278529-442368",
619-
"20:442369-638976",
620-
"20:638977-819200",
619+
"20:442381-638976",
620+
"20:638982-819200",
621621
"20:819201-",
622622
]
623623

0 commit comments

Comments
 (0)