Skip to content

Commit a750ded

Browse files
Simplify vcf_writer removing concurrency
1 parent 79dbcd2 commit a750ded

File tree

1 file changed

+32
-53
lines changed

1 file changed

+32
-53
lines changed

vcztools/vcf_writer.py

Lines changed: 32 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import concurrent.futures
21
import functools
32
import io
43
import logging
@@ -218,28 +217,19 @@ def write_vcf(
218217

219218
if variant_regions is None and variant_targets is None:
220219
# no regions or targets selected
221-
with concurrent.futures.ThreadPoolExecutor() as executor:
222-
preceding_future = None
223-
for v_chunk in range(pos.cdata_shape[0]):
224-
v_mask_chunk = (
225-
filter_evaluator(v_chunk) if filter_evaluator else None
226-
)
227-
future = executor.submit(
228-
c_chunk_to_vcf,
229-
root,
230-
v_chunk,
231-
v_mask_chunk,
232-
samples_selection,
233-
contigs,
234-
filters,
235-
output,
236-
drop_genotypes=drop_genotypes,
237-
no_update=no_update,
238-
preceding_future=preceding_future,
239-
)
240-
if preceding_future:
241-
concurrent.futures.wait((preceding_future,))
242-
preceding_future = future
220+
for v_chunk in range(pos.cdata_shape[0]):
221+
v_mask_chunk = filter_evaluator(v_chunk) if filter_evaluator else None
222+
c_chunk_to_vcf(
223+
root,
224+
v_chunk,
225+
v_mask_chunk,
226+
samples_selection,
227+
contigs,
228+
filters,
229+
output,
230+
drop_genotypes=drop_genotypes,
231+
no_update=no_update,
232+
)
243233
else:
244234
contigs_u = root["contig_id"][:].astype("U").tolist()
245235
regions = parse_regions(variant_regions, contigs_u)
@@ -284,32 +274,25 @@ def write_vcf(
284274
# Use zarr arrays to get mask chunks aligned with the main data
285275
# for convenience.
286276
z_variant_mask = zarr.array(variant_mask, chunks=pos.chunks[0])
287-
with concurrent.futures.ThreadPoolExecutor() as executor:
288-
preceding_future = None
289-
for i, v_chunk in enumerate(chunk_indexes):
290-
v_mask_chunk = z_variant_mask.blocks[i]
291-
292-
if filter_evaluator and np.any(v_mask_chunk):
293-
v_mask_chunk = np.logical_and(
294-
v_mask_chunk, filter_evaluator(v_chunk)
295-
)
296-
if np.any(v_mask_chunk):
297-
future = executor.submit(
298-
c_chunk_to_vcf,
299-
root,
300-
v_chunk,
301-
v_mask_chunk,
302-
samples_selection,
303-
contigs,
304-
filters,
305-
output,
306-
drop_genotypes=drop_genotypes,
307-
no_update=no_update,
308-
preceding_future=preceding_future,
309-
)
310-
if preceding_future:
311-
concurrent.futures.wait((preceding_future,))
312-
preceding_future = future
277+
for i, v_chunk in enumerate(chunk_indexes):
278+
v_mask_chunk = z_variant_mask.blocks[i]
279+
280+
if filter_evaluator and np.any(v_mask_chunk):
281+
v_mask_chunk = np.logical_and(
282+
v_mask_chunk, filter_evaluator(v_chunk)
283+
)
284+
if np.any(v_mask_chunk):
285+
c_chunk_to_vcf(
286+
root,
287+
v_chunk,
288+
v_mask_chunk,
289+
samples_selection,
290+
contigs,
291+
filters,
292+
output,
293+
drop_genotypes=drop_genotypes,
294+
no_update=no_update,
295+
)
313296

314297

315298
def get_vchunk_array(zarray, v_chunk, mask, samples_selection=None):
@@ -336,7 +319,6 @@ def c_chunk_to_vcf(
336319
*,
337320
drop_genotypes,
338321
no_update,
339-
preceding_future=None,
340322
):
341323
chrom = contigs[get_vchunk_array(root["variant_contig"], v_chunk, v_mask_chunk)]
342324
# TODO check we don't truncate silently by doing this
@@ -441,9 +423,6 @@ def c_chunk_to_vcf(
441423
zarray = zarray.reshape((num_variants, num_samples, 1))
442424
encoder.add_format_field(name, zarray)
443425

444-
if preceding_future:
445-
concurrent.futures.wait((preceding_future,))
446-
447426
# TODO: (1) make a guess at this based on number of fields and samples,
448427
# and (2) log a DEBUG message when we have to double.
449428
buflen = 1024

0 commit comments

Comments
 (0)