Skip to content

Commit 107174a

Browse files
Will-Tylerjeromekelleher
authored andcommitted
Parallelize chunk writing
1 parent df24a55 commit 107174a

File tree

1 file changed

+51
-25
lines changed

1 file changed

+51
-25
lines changed

vcztools/vcf_writer.py

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io
44
import re
55
import sys
6+
from collections import deque
67
from datetime import datetime
78

89
import numpy as np
@@ -189,9 +190,12 @@ def write_vcf(
189190

190191
if variant_regions is None and variant_targets is None:
191192
# no regions or targets selected
193+
executor = concurrent.futures.ThreadPoolExecutor()
194+
preceding_future = None
192195
for v_chunk in range(pos.cdata_shape[0]):
193196
v_mask_chunk = filter_evaluator(v_chunk) if filter_evaluator else None
194-
c_chunk_to_vcf(
197+
future = executor.submit(
198+
c_chunk_to_vcf,
195199
root,
196200
v_chunk,
197201
v_mask_chunk,
@@ -201,7 +205,16 @@ def write_vcf(
201205
output,
202206
drop_genotypes=drop_genotypes,
203207
no_update=no_update,
208+
executor=executor,
204209
)
210+
if preceding_future:
211+
concurrent.futures.wait((preceding_future,))
212+
preceding_future = future
213+
if preceding_future:
214+
concurrent.futures.wait((preceding_future,))
215+
# We can't use a with-statement because the threads
216+
# themselves submit tasks to the executor.
217+
executor.shutdown()
205218
else:
206219
contigs_u = root["contig_id"][:].astype("U").tolist()
207220
regions = parse_regions(variant_regions, contigs_u)
@@ -246,7 +259,8 @@ def write_vcf(
246259
# Use zarr arrays to get mask chunks aligned with the main data
247260
# for convenience.
248261
z_variant_mask = zarr.array(variant_mask, chunks=pos.chunks[0])
249-
262+
executor = concurrent.futures.ThreadPoolExecutor()
263+
preceding_future = None
250264
for i, v_chunk in enumerate(chunk_indexes):
251265
v_mask_chunk = z_variant_mask.blocks[i]
252266

@@ -255,7 +269,8 @@ def write_vcf(
255269
v_mask_chunk, filter_evaluator(v_chunk)
256270
)
257271
if np.any(v_mask_chunk):
258-
c_chunk_to_vcf(
272+
future = executor.submit(
273+
c_chunk_to_vcf,
259274
root,
260275
v_chunk,
261276
v_mask_chunk,
@@ -265,7 +280,16 @@ def write_vcf(
265280
output,
266281
drop_genotypes=drop_genotypes,
267282
no_update=no_update,
283+
executor=executor,
268284
)
285+
if preceding_future:
286+
concurrent.futures.wait((preceding_future,))
287+
preceding_future = future
288+
if preceding_future:
289+
concurrent.futures.wait((preceding_future,))
290+
# We can't use a with-statement because the threads
291+
# themselves submit tasks to the executor.
292+
executor.shutdown()
269293

270294

271295
def get_vchunk_array(zarray, v_chunk, mask, samples_selection=None):
@@ -292,6 +316,7 @@ def c_chunk_to_vcf(
292316
*,
293317
drop_genotypes,
294318
no_update,
319+
executor: concurrent.futures.Executor,
295320
):
296321
chrom = None
297322
pos = None
@@ -384,28 +409,29 @@ def load_gt_phased():
384409
nonlocal gt_phased
385410
gt_phased = np.zeros_like(gt, dtype=bool)
386411

387-
with concurrent.futures.ThreadPoolExecutor() as executor:
388-
executor.submit(load_chrom)
389-
executor.submit(load_pos)
390-
executor.submit(load_id)
391-
executor.submit(load_alleles)
392-
executor.submit(load_qual)
393-
executor.submit(load_filter)
394-
395-
for name, zarray in root.items():
396-
if (
397-
name.startswith("call_")
398-
and not name.startswith("call_genotype")
399-
and num_samples != 0
400-
):
401-
executor.submit(load_format_field, name, zarray)
402-
if num_samples is None:
403-
num_samples = zarray.shape[1]
404-
elif name.startswith("variant_") and name not in RESERVED_VARIABLE_NAMES:
405-
executor.submit(load_info_field, name, zarray)
406-
407-
executor.submit(load_gt)
408-
executor.submit(load_gt_phased)
412+
futures = deque()
413+
futures.append(executor.submit(load_chrom))
414+
futures.append(executor.submit(load_pos))
415+
futures.append(executor.submit(load_id))
416+
futures.append(executor.submit(load_alleles))
417+
futures.append(executor.submit(load_qual))
418+
futures.append(executor.submit(load_filter))
419+
420+
for name, zarray in root.items():
421+
if (
422+
name.startswith("call_")
423+
and not name.startswith("call_genotype")
424+
and num_samples != 0
425+
):
426+
futures.append(executor.submit(load_format_field, name, zarray))
427+
if num_samples is None:
428+
num_samples = zarray.shape[1]
429+
elif name.startswith("variant_") and name not in RESERVED_VARIABLE_NAMES:
430+
futures.append(executor.submit(load_info_field, name, zarray))
431+
432+
futures.append(executor.submit(load_gt))
433+
futures.append(executor.submit(load_gt_phased))
434+
concurrent.futures.wait(futures)
409435

410436
ref = alleles[:, 0].astype("S")
411437
alt = alleles[:, 1:].astype("S")

0 commit comments

Comments
 (0)