Skip to content

Commit 363b655

Browse files
Will-Tylerjeromekelleher
authored andcommitted
Load data on same thread
1 parent 908d1b6 commit 363b655

File tree

1 file changed

+75
-148
lines changed

1 file changed

+75
-148
lines changed

vcztools/vcf_writer.py

Lines changed: 75 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import io
44
import re
55
import sys
6-
from collections import deque
76
from datetime import datetime
87

98
import numpy as np
@@ -190,32 +189,28 @@ def write_vcf(
190189

191190
if variant_regions is None and variant_targets is None:
192191
# no regions or targets selected
193-
executor = concurrent.futures.ThreadPoolExecutor()
194-
preceding_future = None
195-
for v_chunk in range(pos.cdata_shape[0]):
196-
v_mask_chunk = filter_evaluator(v_chunk) if filter_evaluator else None
197-
future = executor.submit(
198-
c_chunk_to_vcf,
199-
root,
200-
v_chunk,
201-
v_mask_chunk,
202-
samples_selection,
203-
contigs,
204-
filters,
205-
output,
206-
drop_genotypes=drop_genotypes,
207-
no_update=no_update,
208-
executor=executor,
209-
preceding_future=preceding_future,
210-
)
211-
if preceding_future:
212-
concurrent.futures.wait((preceding_future,))
213-
preceding_future = future
214-
if preceding_future:
215-
concurrent.futures.wait((preceding_future,))
216-
# We can't use a with-statement because the threads
217-
# themselves submit tasks to the executor.
218-
executor.shutdown()
192+
with concurrent.futures.ThreadPoolExecutor() as executor:
193+
preceding_future = None
194+
for v_chunk in range(pos.cdata_shape[0]):
195+
v_mask_chunk = (
196+
filter_evaluator(v_chunk) if filter_evaluator else None
197+
)
198+
future = executor.submit(
199+
c_chunk_to_vcf,
200+
root,
201+
v_chunk,
202+
v_mask_chunk,
203+
samples_selection,
204+
contigs,
205+
filters,
206+
output,
207+
drop_genotypes=drop_genotypes,
208+
no_update=no_update,
209+
preceding_future=preceding_future,
210+
)
211+
if preceding_future:
212+
concurrent.futures.wait((preceding_future,))
213+
preceding_future = future
219214
else:
220215
contigs_u = root["contig_id"][:].astype("U").tolist()
221216
regions = parse_regions(variant_regions, contigs_u)
@@ -260,38 +255,32 @@ def write_vcf(
260255
# Use zarr arrays to get mask chunks aligned with the main data
261256
# for convenience.
262257
z_variant_mask = zarr.array(variant_mask, chunks=pos.chunks[0])
263-
executor = concurrent.futures.ThreadPoolExecutor()
264-
preceding_future = None
265-
for i, v_chunk in enumerate(chunk_indexes):
266-
v_mask_chunk = z_variant_mask.blocks[i]
267-
268-
if filter_evaluator and np.any(v_mask_chunk):
269-
v_mask_chunk = np.logical_and(
270-
v_mask_chunk, filter_evaluator(v_chunk)
271-
)
272-
if np.any(v_mask_chunk):
273-
future = executor.submit(
274-
c_chunk_to_vcf,
275-
root,
276-
v_chunk,
277-
v_mask_chunk,
278-
samples_selection,
279-
contigs,
280-
filters,
281-
output,
282-
drop_genotypes=drop_genotypes,
283-
no_update=no_update,
284-
executor=executor,
285-
preceding_future=preceding_future,
286-
)
287-
if preceding_future:
288-
concurrent.futures.wait((preceding_future,))
289-
preceding_future = future
290-
if preceding_future:
291-
concurrent.futures.wait((preceding_future,))
292-
# We can't use a with-statement because the threads
293-
# themselves submit tasks to the executor.
294-
executor.shutdown()
258+
with concurrent.futures.ThreadPoolExecutor() as executor:
259+
preceding_future = None
260+
for i, v_chunk in enumerate(chunk_indexes):
261+
v_mask_chunk = z_variant_mask.blocks[i]
262+
263+
if filter_evaluator and np.any(v_mask_chunk):
264+
v_mask_chunk = np.logical_and(
265+
v_mask_chunk, filter_evaluator(v_chunk)
266+
)
267+
if np.any(v_mask_chunk):
268+
future = executor.submit(
269+
c_chunk_to_vcf,
270+
root,
271+
v_chunk,
272+
v_mask_chunk,
273+
samples_selection,
274+
contigs,
275+
filters,
276+
output,
277+
drop_genotypes=drop_genotypes,
278+
no_update=no_update,
279+
preceding_future=preceding_future,
280+
)
281+
if preceding_future:
282+
concurrent.futures.wait((preceding_future,))
283+
preceding_future = future
295284

296285

297286
def get_vchunk_array(zarray, v_chunk, mask, samples_selection=None):
@@ -318,125 +307,63 @@ def c_chunk_to_vcf(
318307
*,
319308
drop_genotypes,
320309
no_update,
321-
executor: concurrent.futures.Executor,
322310
preceding_future: Optional[concurrent.futures.Future] = None,
323311
):
324-
chrom = None
325-
pos = None
326-
id = None
327-
alleles = None
328-
qual = None
329-
filter_ = None
312+
chrom = contigs[get_vchunk_array(root.variant_contig, v_chunk, v_mask_chunk)]
313+
# TODO check we don't truncate silently by doing this
314+
pos = get_vchunk_array(root.variant_position, v_chunk, v_mask_chunk).astype(
315+
np.int32
316+
)
317+
id = get_vchunk_array(root.variant_id, v_chunk, v_mask_chunk).astype("S")
318+
alleles = get_vchunk_array(root.variant_allele, v_chunk, v_mask_chunk)
319+
qual = get_vchunk_array(root.variant_quality, v_chunk, v_mask_chunk)
320+
filter_ = get_vchunk_array(root.variant_filter, v_chunk, v_mask_chunk)
330321
format_fields = {}
331322
info_fields = {}
332323
num_samples = len(samples_selection) if samples_selection is not None else None
333324
gt = None
334325
gt_phased = None
335326

336-
def load_chrom():
337-
nonlocal chrom
338-
chrom = contigs[get_vchunk_array(root.variant_contig, v_chunk, v_mask_chunk)]
339-
340-
def load_pos():
341-
nonlocal pos
342-
# TODO check we don't truncate silently by doing this
343-
pos = get_vchunk_array(root.variant_position, v_chunk, v_mask_chunk).astype(
344-
np.int32
345-
)
346-
347-
def load_id():
348-
nonlocal id
349-
id = get_vchunk_array(root.variant_id, v_chunk, v_mask_chunk).astype("S")
350-
351-
def load_alleles():
352-
nonlocal alleles
353-
alleles = get_vchunk_array(root.variant_allele, v_chunk, v_mask_chunk)
354-
355-
def load_qual():
356-
nonlocal qual
357-
qual = get_vchunk_array(root.variant_quality, v_chunk, v_mask_chunk)
358-
359-
def load_filter():
360-
nonlocal filter_
361-
filter_ = get_vchunk_array(root.variant_filter, v_chunk, v_mask_chunk)
362-
363-
def load_format_field(name, zarray):
364-
nonlocal format_fields, v_chunk, v_mask_chunk, samples_selection
365-
vcf_name = name[len("call_") :]
366-
format_fields[vcf_name] = get_vchunk_array(
367-
zarray, v_chunk, v_mask_chunk, samples_selection
368-
)
369-
370-
def load_info_field(name, zarray):
371-
nonlocal info_fields, v_chunk, v_mask_chunk
372-
vcf_name = name[len("variant_") :]
373-
info_fields[vcf_name] = get_vchunk_array(zarray, v_chunk, v_mask_chunk)
374-
375-
def load_gt():
376-
pass
377-
378-
def load_gt_phased():
379-
pass
380-
381327
if "call_genotype" in root and not drop_genotypes:
382328
if samples_selection is not None and num_samples != 0:
383-
384-
def load_gt():
385-
nonlocal gt
386-
gt = get_vchunk_array(
387-
root["call_genotype"], v_chunk, v_mask_chunk, samples_selection
388-
)
329+
gt = get_vchunk_array(
330+
root["call_genotype"], v_chunk, v_mask_chunk, samples_selection
331+
)
389332
else:
390-
391-
def load_gt():
392-
nonlocal gt
393-
gt = get_vchunk_array(root["call_genotype"], v_chunk, v_mask_chunk)
333+
gt = get_vchunk_array(root["call_genotype"], v_chunk, v_mask_chunk)
394334

395335
if (
396336
"call_genotype_phased" in root
397337
and not drop_genotypes
398338
and (samples_selection is None or num_samples > 0)
399339
):
400-
401-
def load_gt_phased():
402-
nonlocal gt_phased
403-
gt_phased = get_vchunk_array(
404-
root["call_genotype_phased"],
405-
v_chunk,
406-
v_mask_chunk,
407-
samples_selection,
408-
)
340+
gt_phased = get_vchunk_array(
341+
root["call_genotype_phased"],
342+
v_chunk,
343+
v_mask_chunk,
344+
samples_selection,
345+
)
409346
else:
410-
411-
def load_gt_phased():
412-
nonlocal gt_phased
413-
gt_phased = np.zeros_like(gt, dtype=bool)
414-
415-
futures = deque()
416-
futures.append(executor.submit(load_chrom))
417-
futures.append(executor.submit(load_pos))
418-
futures.append(executor.submit(load_id))
419-
futures.append(executor.submit(load_alleles))
420-
futures.append(executor.submit(load_qual))
421-
futures.append(executor.submit(load_filter))
347+
gt_phased = np.zeros_like(gt, dtype=bool)
422348

423349
for name, zarray in root.items():
424350
if (
425351
name.startswith("call_")
426352
and not name.startswith("call_genotype")
427353
and num_samples != 0
428354
):
429-
futures.append(executor.submit(load_format_field, name, zarray))
355+
vcf_name = name[len("call_") :]
356+
format_fields[vcf_name] = get_vchunk_array(
357+
zarray, v_chunk, v_mask_chunk, samples_selection
358+
)
430359
if num_samples is None:
431360
num_samples = zarray.shape[1]
432361
elif name.startswith("variant_") and name not in RESERVED_VARIABLE_NAMES:
433-
futures.append(executor.submit(load_info_field, name, zarray))
362+
vcf_name = name[len("variant_") :]
363+
info_fields[vcf_name] = get_vchunk_array(zarray, v_chunk, v_mask_chunk)
434364

435-
futures.append(executor.submit(load_gt))
436-
futures.append(executor.submit(load_gt_phased))
437365
if preceding_future:
438-
futures.append(preceding_future)
439-
concurrent.futures.wait(futures)
366+
concurrent.futures.wait((preceding_future,))
440367

441368
ref = alleles[:, 0].astype("S")
442369
alt = alleles[:, 1:].astype("S")

0 commit comments

Comments
 (0)