Skip to content

Commit 5729138

Browse files
tomwhitejeromekelleher
authored andcommitted
Use chunk_data rather than read from Zarr in view
1 parent 41dfd9f commit 5729138

File tree

1 file changed

+34
-133
lines changed

1 file changed

+34
-133
lines changed

vcztools/vcf_writer.py

Lines changed: 34 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,6 @@
88
import numpy as np
99
import zarr
1010

11-
from vcztools.regions import (
12-
parse_regions,
13-
parse_targets,
14-
regions_to_chunk_indexes,
15-
regions_to_selection,
16-
)
1711
from vcztools.samples import parse_samples
1812
from vcztools.utils import (
1913
open_file_like,
@@ -145,10 +139,10 @@ def write_vcf(
145139
samples, all_samples, force_samples=force_samples
146140
)
147141

148-
filter_expr = filter_mod.FilterExpression(
142+
# Need to try parsing filter expressions before writing header
143+
filter_mod.FilterExpression(
149144
field_names=set(root), include=include, exclude=exclude
150145
)
151-
reader = retrieval.VariantChunkReader(root)
152146

153147
if not no_header:
154148
original_header = root.attrs.get("vcf_header", None)
@@ -165,109 +159,30 @@ def write_vcf(
165159
if header_only:
166160
return
167161

168-
pos = root["variant_position"]
169162
contigs = root["contig_id"][:].astype("S")
170163
filters = root["filter_id"][:].astype("S")
171164

172-
if variant_regions is None and variant_targets is None:
173-
# no regions or targets selected
174-
for v_chunk in range(pos.cdata_shape[0]):
175-
chunk_data = reader[v_chunk]
176-
v_mask_chunk = filter_expr.evaluate(chunk_data)
177-
# NOTE: we need to update c_chunk_to_vcf to accept the chunk_data array
178-
c_chunk_to_vcf(
179-
root,
180-
v_chunk,
181-
v_mask_chunk,
182-
samples_selection,
183-
contigs,
184-
filters,
185-
output,
186-
drop_genotypes=drop_genotypes,
187-
no_update=no_update,
188-
)
189-
else:
190-
contigs_u = root["contig_id"][:].astype("U").tolist()
191-
regions = parse_regions(variant_regions, contigs_u)
192-
targets, complement = parse_targets(variant_targets, contigs_u)
193-
194-
# Use the region index to find the chunks that overlap specfied regions or
195-
# targets
196-
region_index = root["region_index"][:]
197-
chunk_indexes = regions_to_chunk_indexes(
198-
regions,
199-
targets,
200-
complement,
201-
region_index,
165+
for chunk_data in retrieval.variant_chunk_iter(
166+
root,
167+
variant_regions=variant_regions,
168+
variant_targets=variant_targets,
169+
include=include,
170+
exclude=exclude,
171+
samples_selection=samples_selection,
172+
):
173+
c_chunk_to_vcf(
174+
chunk_data,
175+
samples_selection,
176+
contigs,
177+
filters,
178+
output,
179+
drop_genotypes=drop_genotypes,
180+
no_update=no_update,
202181
)
203182

204-
# Then use only load required variant_contig/position chunks
205-
if len(chunk_indexes) == 0:
206-
# no chunks - no variants to write
207-
return
208-
elif len(chunk_indexes) == 1:
209-
# single chunk
210-
block_sel = chunk_indexes[0]
211-
else:
212-
# zarr.blocks doesn't support int array indexing - use that when it does
213-
block_sel = slice(chunk_indexes[0], chunk_indexes[-1] + 1)
214-
215-
region_variant_contig = root["variant_contig"].blocks[block_sel][:]
216-
region_variant_position = root["variant_position"].blocks[block_sel][:]
217-
region_variant_length = root["variant_length"].blocks[block_sel][:]
218-
219-
# Find the final variant selection
220-
variant_selection = regions_to_selection(
221-
regions,
222-
targets,
223-
complement,
224-
region_variant_contig,
225-
region_variant_position,
226-
region_variant_length,
227-
)
228-
variant_mask = np.zeros(region_variant_position.shape[0], dtype=bool)
229-
variant_mask[variant_selection] = 1
230-
# Use zarr arrays to get mask chunks aligned with the main data
231-
# for convenience.
232-
z_variant_mask = zarr.array(variant_mask, chunks=pos.chunks[0])
233-
for i, v_chunk in enumerate(chunk_indexes):
234-
chunk_data = reader[v_chunk]
235-
v_mask_chunk = z_variant_mask.blocks[i]
236-
# NOTE: Skipping optimisations while refactor is ongoing
237-
v_mask_chunk = np.logical_and(
238-
v_mask_chunk, filter_expr.evaluate(chunk_data)
239-
)
240-
if np.any(v_mask_chunk):
241-
c_chunk_to_vcf(
242-
root,
243-
v_chunk,
244-
v_mask_chunk,
245-
samples_selection,
246-
contigs,
247-
filters,
248-
output,
249-
drop_genotypes=drop_genotypes,
250-
no_update=no_update,
251-
)
252-
253-
254-
def get_vchunk_array(zarray, v_chunk, mask, samples_selection=None):
255-
v_chunksize = zarray.chunks[0]
256-
start = v_chunksize * v_chunk
257-
end = v_chunksize * (v_chunk + 1)
258-
if samples_selection is None:
259-
result = zarray[start:end]
260-
else:
261-
result = zarray.oindex[start:end, samples_selection]
262-
if mask is not None:
263-
result = result[mask]
264-
return result
265-
266183

267184
def c_chunk_to_vcf(
268-
root,
269-
v_chunk,
270-
v_mask_chunk,
185+
chunk_data,
271186
samples_selection,
272187
contigs,
273188
filters,
@@ -277,60 +192,46 @@ def c_chunk_to_vcf(
277192
no_update,
278193
):
279194
# TODO check we don't truncate silently by doing this
280-
pos = get_vchunk_array(root["variant_position"], v_chunk, v_mask_chunk).astype(
281-
np.int32
282-
)
195+
pos = chunk_data["variant_position"].astype(np.int32)
283196
num_variants = len(pos)
284197
if num_variants == 0:
285198
return ""
286-
chrom = contigs[get_vchunk_array(root["variant_contig"], v_chunk, v_mask_chunk)]
287-
id = get_vchunk_array(root["variant_id"], v_chunk, v_mask_chunk).astype("S")
288-
alleles = get_vchunk_array(root["variant_allele"], v_chunk, v_mask_chunk)
289-
qual = get_vchunk_array(root["variant_quality"], v_chunk, v_mask_chunk)
290-
filter_ = get_vchunk_array(root["variant_filter"], v_chunk, v_mask_chunk)
199+
chrom = contigs[chunk_data["variant_contig"]]
200+
id = chunk_data["variant_id"].astype("S")
201+
alleles = chunk_data["variant_allele"]
202+
qual = chunk_data["variant_quality"]
203+
filter_ = chunk_data["variant_filter"]
291204
format_fields = {}
292205
info_fields = {}
293206
num_samples = len(samples_selection) if samples_selection is not None else None
294207
gt = None
295208
gt_phased = None
296209

297-
if "call_genotype" in root and not drop_genotypes:
298-
if samples_selection is not None and num_samples != 0:
299-
gt = get_vchunk_array(
300-
root["call_genotype"], v_chunk, v_mask_chunk, samples_selection
301-
)
302-
else:
303-
gt = get_vchunk_array(root["call_genotype"], v_chunk, v_mask_chunk)
210+
if "call_genotype" in chunk_data and not drop_genotypes:
211+
gt = chunk_data["call_genotype"]
304212

305213
if (
306-
"call_genotype_phased" in root
214+
"call_genotype_phased" in chunk_data
307215
and not drop_genotypes
308216
and (samples_selection is None or num_samples != 0)
309217
):
310-
gt_phased = get_vchunk_array(
311-
root["call_genotype_phased"],
312-
v_chunk,
313-
v_mask_chunk,
314-
samples_selection,
315-
)
218+
gt_phased = chunk_data["call_genotype_phased"]
316219
else:
317220
gt_phased = np.zeros_like(gt, dtype=bool)
318221

319-
for name, zarray in root.arrays():
222+
for name, array in chunk_data.items():
320223
if (
321224
name.startswith("call_")
322225
and not name.startswith("call_genotype")
323226
and num_samples != 0
324227
):
325228
vcf_name = name[len("call_") :]
326-
format_fields[vcf_name] = get_vchunk_array(
327-
zarray, v_chunk, v_mask_chunk, samples_selection
328-
)
229+
format_fields[vcf_name] = array
329230
if num_samples is None:
330-
num_samples = zarray.shape[1]
231+
num_samples = array.shape[1]
331232
elif name.startswith("variant_") and name not in RESERVED_VARIABLE_NAMES:
332233
vcf_name = name[len("variant_") :]
333-
info_fields[vcf_name] = get_vchunk_array(zarray, v_chunk, v_mask_chunk)
234+
info_fields[vcf_name] = array
334235

335236
ref = alleles[:, 0].astype("S")
336237
alt = alleles[:, 1:].astype("S")
@@ -340,7 +241,7 @@ def c_chunk_to_vcf(
340241
if (
341242
not no_update
342243
and samples_selection is not None
343-
and "call_genotype" in root
244+
and "call_genotype" in chunk_data
344245
and not drop_genotypes
345246
):
346247
# Recompute INFO/AC and INFO/AN

0 commit comments

Comments
 (0)