Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion tests/test_bcftools_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ def run_vcztools(args: str, expect_error=False) -> tuple[str, str]:
"view --no-version -i 'FILTER~\"VQSRTrancheINDEL99.00to100.00\"'",
"1kg_2020_chrM.vcf.gz"
),
("view --no-version -i 'INFO/AC>2'", "chr22.vcf.gz")
("view --no-version -i 'INFO/AC>2'", "chr22.vcf.gz"),
("view --no-version -i 'F_MISSING<0.05'", "sample.vcf.gz"),
# GT is not in annotations file
("view --no-version -i 'N_MISSING=0'", "1kg_2020_chr20_annotations.bcf"),
],
# This is necessary when trying to run individual tests, as the arguments above
# make for unworkable command lines
Expand Down
34 changes: 33 additions & 1 deletion tests/test_calculate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import numpy as np
import pytest

from vcztools.calculate import REF, SNP, UNCLASSIFIED, get_variant_type
from vcztools.calculate import (
REF,
SNP,
UNCLASSIFIED,
calculate_if_absent,
get_variant_type,
)
from vcztools.constants import INT_FILL, INT_MISSING


@pytest.mark.parametrize(
Expand All @@ -20,3 +28,27 @@
)
def test_get_variant_type(ref, alt, expected_type):
assert get_variant_type(ref, alt) == expected_type


def test_calculate_if_absent():
gt = np.array(
[
[[0, 0], [0, 1], [1, 1]],
[[0, 0], [0, 2], [2, 2]],
[[0, 1], [1, 2], [2, 2]],
[
[INT_MISSING, INT_MISSING],
[INT_MISSING, INT_MISSING],
[INT_FILL, INT_FILL],
],
[[INT_MISSING, INT_MISSING], [0, 3], [INT_FILL, INT_FILL]],
]
)
data = {"call_genotype": gt}

calculate_if_absent(data, "variant_F_MISSING")

np.testing.assert_array_equal(data["variant_N_MISSING"], np.array([0, 0, 0, 3, 2]))
np.testing.assert_array_equal(
data["variant_F_MISSING"], np.array([0, 0, 0, 1, 2 / 3])
)
1 change: 1 addition & 0 deletions tests/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test_invalid_expressions(self, parser, expression):
("fisher(INFO/DP4)", filter_mod.UnsupportedFunctionsError),
("fisher(FMT/ADF,FMT/ADR)", filter_mod.UnsupportedFunctionsError),
("N_PASS(GQ>90)", filter_mod.UnsupportedFunctionsError),
("ILEN > 0", filter_mod.UnsupportedCalculatedVariableError),
('TYPE="bnd"', filter_mod.UnsupportedTypeFieldError),
],
)
Expand Down
42 changes: 42 additions & 0 deletions vcztools/calculate.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,45 @@ def calculate_variant_type(variant_allele: np.ndarray) -> np.ndarray:
for j in range(alt.shape[1]):
variant_type[i, j] = get_variant_type(ref[i], alt[i, j])
return variant_type


SUPPORTED_CALCULATED_VARIABLES = ["N_MISSING", "F_MISSING"]
UNSUPPORTED_CALCULATED_VARIABLES = [
"N_ALT",
"N_SAMPLES",
"AC",
"MAC",
"AF",
"MAF",
"AN",
"ILEN",
]


def _n_missing(data: dict[str, np.ndarray]) -> np.ndarray:
gt = data.get("call_genotype", None)
if gt is None:
return np.zeros_like(data["variant_position"])
else:
return np.sum(np.all(gt < 0, axis=-1), axis=1)


def _f_missing(data: dict[str, np.ndarray]) -> np.ndarray:
calculate_if_absent(data, "variant_N_MISSING")
gt = data["call_genotype"]
n_samples = gt.shape[1]
n_missing = data["variant_N_MISSING"]
return n_missing / n_samples


field_to_function = {
"variant_N_MISSING": _n_missing,
"variant_F_MISSING": _f_missing,
}


def calculate_if_absent(data: dict[str, np.ndarray], vcz_name: str) -> None:
"""Populates the array for 'vcz_name' in 'data' if not already present."""
if vcz_name not in data:
func = field_to_function[vcz_name]
data[vcz_name] = func(data)
25 changes: 22 additions & 3 deletions vcztools/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
import numpy as np
import pyparsing as pp

from vcztools.calculate import SNP, calculate_variant_type
from vcztools.calculate import (
SNP,
SUPPORTED_CALCULATED_VARIABLES,
UNSUPPORTED_CALCULATED_VARIABLES,
calculate_if_absent,
calculate_variant_type,
)

from .utils import vcf_name_to_vcz_names

Expand Down Expand Up @@ -49,6 +55,11 @@ class UnsupportedArraySubscriptError(UnsupportedFilteringFeatureError):
feature = "Array subscripts"


class UnsupportedCalculatedVariableError(UnsupportedFilteringFeatureError):
issue = "171"
feature = "Calculated variables"


class UnsupportedRegexError(UnsupportedFilteringFeatureError):
issue = "174"
feature = "Regular expressions"
Expand Down Expand Up @@ -121,19 +132,27 @@ def __init__(self, mapper, tokens):
token = tokens[0]
if token == "GT":
raise UnsupportedGenotypeValuesError()
elif token in UNSUPPORTED_CALCULATED_VARIABLES:
raise UnsupportedCalculatedVariableError()
field_names = mapper(token)
if len(field_names) == 0:
raise ValueError(f'the tag "{token}" is not defined')
if token in SUPPORTED_CALCULATED_VARIABLES:
self.field_name = f"variant_{token}"
self.refs = ["variant_position", "call_genotype"]
else:
raise ValueError(f'the tag "{token}" is not defined')
elif len(field_names) == 1:
self.field_name = field_names[0]
logger.debug(f"Mapped {token} to {self.field_name}")
self.refs = [self.field_name]
else:
raise ValueError(
f'ambiguous filtering expression: "{token}", '
f"both INFO/{token} and FORMAT/{token} are defined"
)

def eval(self, data):
calculate_if_absent(data, self.field_name)
value = np.asarray(data[self.field_name])
if self.field_name.startswith("call_") and len(value.shape) > 2:
raise UnsupportedHigherDimensionalFormatFieldsError()
Expand All @@ -143,7 +162,7 @@ def __repr__(self):
return self.field_name

def referenced_fields(self):
return frozenset([self.field_name])
return frozenset(self.refs)


class IndexedIdentifier(EvaluationNode):
Expand Down
5 changes: 4 additions & 1 deletion vcztools/retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ def variant_chunk_index_iter_with_filtering(
if filter_expr.parse_result is None:
filter_expr = None
else:
filter_fields = list(filter_expr.referenced_fields)
# only return referenced fields that are actually present
filter_fields = list(
set.intersection(set(filter_expr.referenced_fields), set(root))
)
filter_fields_reader = VariantChunkReader(root, fields=filter_fields)

for v_chunk, v_mask_chunk in variant_chunk_index_iter(
Expand Down
Loading