Skip to content

Commit e577e12

Browse files
authored
feat: add VCF ingest function (#31)
1 parent 15c208b commit e577e12

25 files changed

+4342
-294
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ dependencies = [
2828
"python-dotenv",
2929
"pydantic-settings",
3030
"requests",
31+
"pysam==0.23.0", # see https://github.com/ga4gh/vrs-python/issues/560
3132
]
3233
dynamic = ["version"]
3334

src/anyvlm/anyvar/base_client.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,42 @@
11
"""Provide abstraction for a AnyVLM-to-AnyVar connection."""
22

33
import abc
4+
from collections.abc import Iterable, Sequence
45

6+
from anyvar.utils.liftover_utils import ReferenceAssembly
57
from anyvar.utils.types import VrsVariation
68

79

810
class AnyVarClientError(Exception):
911
"""Generic client-related exception."""
1012

1113

12-
class UnidentifiedObjectError(AnyVarClientError):
13-
"""Raise if input object lacks an ID property"""
14+
class AnyVarClientConnectionError(AnyVarClientError):
15+
"""Raise for failure to connect to AnyVar client.
16+
17+
Likely relevant only for HTTP-based implementation.
18+
"""
1419

1520

1621
class BaseAnyVarClient(abc.ABC):
1722
"""Interface elements for an AnyVar client"""
1823

1924
@abc.abstractmethod
20-
def put_objects(self, objects: list[VrsVariation]) -> None:
21-
"""Register objects with AnyVar
22-
23-
All input objects must have a populated ID field. A validation check for this is
24-
performed before any variants are registered.
25-
26-
:param objects: variation objects to register
27-
:raise AnyVarClientError: for errors relating to specifics of client interface
28-
:raise UnidentifiedObjectError: if *any* provided object lacks a VRS ID
25+
def put_allele_expressions(
26+
self,
27+
expressions: Iterable[str],
28+
assembly: ReferenceAssembly = ReferenceAssembly.GRCH38,
29+
) -> Sequence[str | None]:
30+
"""Submit allele expressions to an AnyVar instance and retrieve corresponding VRS IDs
31+
32+
Currently, only expressions supported by the VRS-Python translator are supported.
33+
This could change depending on the AnyVar implementation, though, and probably
34+
can't be validated on the AnyVLM side.
35+
36+
:param expressions: variation expressions to register
37+
:param assembly: reference assembly used in variation expressions
38+
:return: list where the i'th item is either the VRS ID if translation succeeds,
39+
else `None`, for the i'th expression
2940
"""
3041

3142
@abc.abstractmethod
@@ -38,7 +49,7 @@ def search_by_interval(
3849
:param start: start position for genomic region
3950
:param end: end position for genomic region
4051
:return: list of matching variant objects
41-
:raise AnyVarClientError: if connection is unsuccessful during search query
52+
:raise AnyVarClientConnectionError: if connection is unsuccessful during search query
4253
"""
4354

4455
@abc.abstractmethod

src/anyvlm/anyvar/http_client.py

Lines changed: 58 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
"""Provide abstraction for a VLM-to-AnyVar connection."""
22

33
import logging
4+
from collections.abc import Iterable, Sequence
5+
from http import HTTPStatus
46

57
import requests
8+
from anyvar.utils.liftover_utils import ReferenceAssembly
69
from anyvar.utils.types import VrsVariation
7-
from ga4gh.vrs import models
10+
from ga4gh.vrs import VrsType, models
811

912
from anyvlm.anyvar.base_client import (
13+
AnyVarClientConnectionError,
1014
AnyVarClientError,
1115
BaseAnyVarClient,
12-
UnidentifiedObjectError,
1316
)
1417

1518
_logger = logging.getLogger(__name__)
@@ -26,38 +29,65 @@ def __init__(
2629
:param hostname: service API root
2730
:param request_timeout: timeout value, in seconds, for HTTP requests
2831
"""
32+
_logger.info("Initializing HTTP-based AnyVar client with hostname %s", hostname)
2933
self.hostname = hostname
3034
self.request_timeout = request_timeout
3135

32-
def put_objects(self, objects: list[VrsVariation]) -> None:
33-
"""Register objects with AnyVar
36+
def put_allele_expressions(
37+
self,
38+
expressions: Iterable[str],
39+
assembly: ReferenceAssembly = ReferenceAssembly.GRCH38,
40+
) -> Sequence[str | None]:
41+
"""Submit allele expressions to an AnyVar instance and retrieve corresponding VRS IDs
3442
35-
All input objects must have a populated ID field. A validation check for this is
36-
performed before any variants are registered.
43+
Currently, only expressions supported by the VRS-Python translator are supported.
44+
This could change depending on the AnyVar implementation, though, and probably
45+
can't be validated on the AnyVLM side.
3746
38-
:param objects: variation objects to register
39-
:return: completed VRS objects
40-
:raise AnyVarClientError: if connection is unsuccessful during registration request
41-
:raise UnidentifiedObjectError: if *any* provided object lacks a VRS ID
47+
:param expressions: variation expressions to register
48+
:param assembly: reference assembly used in expressions
49+
:return: list where the i'th item is either the VRS ID if translation succeeds,
50+
else `None`, for the i'th expression
51+
:raise AnyVarClientError: for unexpected errors relating to specifics of client interface
4252
"""
43-
objects_to_submit = []
44-
for vrs_object in objects:
45-
if not vrs_object.id:
46-
_logger.error("Provided variant %s has no VRS ID", vrs_object)
47-
raise UnidentifiedObjectError
48-
objects_to_submit.append(
49-
vrs_object.model_dump(exclude_none=True, mode="json")
50-
)
51-
for vrs_object in objects_to_submit:
52-
response = requests.put(
53-
f"{self.hostname}/vrs_variation",
54-
json=vrs_object,
55-
timeout=self.request_timeout,
56-
)
53+
results = []
54+
url = f"{self.hostname}/variation"
55+
for expression in expressions:
56+
payload = {
57+
"definition": expression,
58+
"assembly_name": assembly.value,
59+
"input_type": VrsType.ALLELE.value,
60+
}
61+
try:
62+
response = requests.put(
63+
url,
64+
json=payload,
65+
timeout=self.request_timeout,
66+
)
67+
except requests.ConnectionError as e:
68+
_logger.exception(
69+
"Unable to establish connection using AnyVar configured at %s",
70+
self.hostname,
71+
)
72+
raise AnyVarClientConnectionError from e
5773
try:
5874
response.raise_for_status()
5975
except requests.HTTPError as e:
60-
raise AnyVarClientError from e
76+
_logger.exception(
77+
"Encountered HTTP exception submitting payload %s to %s",
78+
payload,
79+
url,
80+
)
81+
if response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY:
82+
_logger.debug(
83+
"Translation failed for variant expression '%s'", expression
84+
)
85+
results.append(None)
86+
else:
87+
raise AnyVarClientError from e
88+
else:
89+
results.append(response.json()["object_id"])
90+
return results
6191

6292
def search_by_interval(
6393
self, accession: str, start: int, end: int
@@ -89,3 +119,6 @@ def close(self) -> None:
89119
90120
This is a no-op for this class.
91121
"""
122+
_logger.info(
123+
"Closing HTTP-based AnyVar client class. This requires no further action."
124+
)

src/anyvlm/anyvar/python_client.py

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
"""Implement AnyVar client interface for direct Python-based access."""
22

33
import logging
4+
from collections.abc import Iterable, Sequence
45

56
from anyvar import AnyVar
67
from anyvar.storage.base_storage import Storage
7-
from anyvar.translate.translate import Translator
8-
from anyvar.utils.types import VrsVariation
8+
from anyvar.translate.translate import TranslationError, Translator
9+
from anyvar.utils.liftover_utils import ReferenceAssembly
10+
from anyvar.utils.types import SupportedVariationType, VrsVariation
11+
from ga4gh.vrs.dataproxy import DataProxyValidationError
912

10-
from anyvlm.anyvar.base_client import BaseAnyVarClient, UnidentifiedObjectError
13+
from anyvlm.anyvar.base_client import BaseAnyVarClient
1114

1215
_logger = logging.getLogger(__name__)
1316

@@ -23,20 +26,41 @@ def __init__(self, translator: Translator, storage: Storage) -> None:
2326
"""
2427
self.av = AnyVar(translator, storage)
2528

26-
def put_objects(self, objects: list[VrsVariation]) -> None:
27-
"""Register objects with AnyVar
28-
29-
All input objects must have a populated ID field. A validation check for this is
30-
performed before any variants are registered.
31-
32-
:param objects: variation objects to register
33-
:raise UnidentifiedObjectError: if *any* provided object lacks a VRS ID
29+
def put_allele_expressions(
30+
self,
31+
expressions: Iterable[str],
32+
assembly: ReferenceAssembly = ReferenceAssembly.GRCH38,
33+
) -> Sequence[str | None]:
34+
"""Submit allele expressions to an AnyVar instance and retrieve corresponding VRS IDs
35+
36+
Currently, only expressions supported by the VRS-Python translator are supported.
37+
This could change depending on the AnyVar implementation, though, and probably
38+
can't be validated on the AnyVLM side.
39+
40+
:param expressions: variation expressions to register
41+
:param assembly: reference assembly used in expressions
42+
:return: list where the i'th item is either the VRS ID if translation succeeds,
43+
else `None`, for the i'th expression
3444
"""
35-
for variant in objects:
36-
if not variant.id:
37-
_logger.error("Provided variant %s has no VRS ID", variant)
38-
raise UnidentifiedObjectError
39-
self.av.put_objects(objects) # type: ignore[reportArgumentType]
45+
results = []
46+
for expression in expressions:
47+
translated_variation = None
48+
try:
49+
translated_variation = self.av.translator.translate_variation(
50+
expression,
51+
assembly=assembly.value,
52+
input_type=SupportedVariationType.ALLELE,
53+
)
54+
except DataProxyValidationError:
55+
_logger.exception("Found invalid base in expression %s", expression)
56+
except TranslationError:
57+
_logger.exception("Failed to translate expression: %s", expression)
58+
if translated_variation:
59+
self.av.put_objects([translated_variation]) # type: ignore
60+
results.append(translated_variation.id) # type: ignore
61+
else:
62+
results.append(None)
63+
return results
4064

4165
def search_by_interval(
4266
self, accession: str, start: int, end: int
@@ -65,4 +89,5 @@ def search_by_interval(
6589

6690
def close(self) -> None:
6791
"""Clean up AnyVar instance."""
92+
_logger.info("Closing AnyVar client.")
6893
self.av.object_store.close()

src/anyvlm/functions/ingest_vcf.py

Lines changed: 88 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,96 @@
1-
"""Get a VCF, register its contained variants, and add cohort frequency data"""
1+
"""Get a VCF, register its contained variants, and add cohort frequency data to storage"""
22

3+
import logging
4+
from collections import namedtuple
5+
from collections.abc import Iterator
36
from pathlib import Path
47

8+
import pysam
9+
from anyvar.utils.liftover_utils import ReferenceAssembly
10+
from ga4gh.va_spec.base import CohortAlleleFrequencyStudyResult
511

6-
def ingest_vcf(vcf_path: Path) -> None:
7-
"""Extract variant and frequency information from a single gVCF
12+
from anyvlm.anyvar.base_client import BaseAnyVarClient
813

9-
For now, we assume they're available under consistently-named INFO fields. In the
10-
future, we could enable custom field names, incrementing on top of existing counts,
11-
etc.
14+
_logger = logging.getLogger(__name__)
15+
16+
17+
AfData = namedtuple("AfData", ("ac", "an", "ac_het", "ac_hom", "ac_hemi"))
18+
19+
20+
class VcfAfColumnsError(Exception):
21+
"""Raise for missing VCF INFO columns that are required for AF ingestion"""
22+
23+
24+
def _yield_expression_af_batches(
25+
vcf: pysam.VariantFile, batch_size: int = 1000
26+
) -> Iterator[list[tuple[str, CohortAlleleFrequencyStudyResult]]]:
27+
"""Generate a variant expression-allele frequency data pairing, one at a time
28+
29+
:param vcf: VCF to pull variants from
30+
:param batch_size: size of return batches
31+
:return: iterator of lists of pairs of variant expressions and AF data classes
32+
"""
33+
batch = []
34+
35+
for record in vcf:
36+
for i, alt in enumerate(record.alts or []):
37+
if record.ref is None or "*" in record.ref or "*" in alt:
38+
_logger.info("Skipping missing allele at %s", record)
39+
continue
40+
expression = f"{record.chrom}-{record.pos}-{record.ref}-{alt}"
41+
try:
42+
af = AfData(
43+
ac=record.info["AC"][i],
44+
an=record.info["AN"],
45+
ac_het=record.info["AC_Het"][i],
46+
ac_hom=record.info["AC_Hom"][i],
47+
ac_hemi=record.info["AC_Hemi"][i],
48+
)
49+
except KeyError as e:
50+
info = record.info
51+
msg = f"One or more required INFO column is missing: {'AC' in info}, {'AN' in info}, {'AC_Het' in info}, {'AC_Hom' in info}, {'AC_Hemi' in info}"
52+
_logger.exception(msg)
53+
raise VcfAfColumnsError(msg) from e
54+
batch.append((expression, af))
55+
if len(batch) >= batch_size:
56+
_logger.debug("Yielding next batch")
57+
yield batch
58+
batch = []
59+
if batch:
60+
yield batch
61+
_logger.debug("Expression/AF generator exhausted")
62+
63+
64+
def ingest_vcf(
65+
vcf_path: Path,
66+
av: BaseAnyVarClient,
67+
assembly: ReferenceAssembly = ReferenceAssembly.GRCH38,
68+
) -> None:
69+
"""Extract variant and frequency information from a single VCF
70+
71+
Current assumptions (subject to change):
72+
* annotations for cohort are provided in 1 file
73+
* INFO fields are named in conformance with convention used here:
74+
* AC (type: A)
75+
* AN (type: 1)
76+
* AC_Het (type: A)
77+
* AC_Hom (type: A)
78+
* AC_Hemi (type: A)
1279
1380
:param vcf_path: location of input file
81+
:param av: AnyVar client
82+
:param assembly: reference assembly used by VCF
83+
:raise VcfAfColumnsError: if VCF is missing required columns
1484
"""
15-
raise NotImplementedError
85+
vcf = pysam.VariantFile(filename=vcf_path.absolute().as_uri(), mode="r")
86+
87+
for batch in _yield_expression_af_batches(vcf):
88+
expressions, afs = zip(*batch, strict=True)
89+
variant_ids = av.put_allele_expressions(expressions, assembly)
90+
for variant_id, af in zip(variant_ids, afs, strict=True): # noqa: B007
91+
if variant_id is None:
92+
continue
93+
# make call to object store method for putting CAF here
94+
# presumably build the CAF object here + call the insert method with it
95+
# may need to alter zip() if the insert method expects a full variation
96+
# instead of just the ID

0 commit comments

Comments
 (0)