11"""Get a VCF, register its contained variants, and add cohort frequency data to storage"""
22
33import logging
4- import os
4+ from collections import namedtuple
55from collections .abc import Iterator
66from pathlib import Path
77
88import pysam
9- from anyvar .translate .vrs_python import AlleleTranslator
10- from anyvar .utils .types import VrsVariation
11- from ga4gh .vrs .dataproxy import create_dataproxy
9+ from ga4gh .va_spec .base import CohortAlleleFrequencyStudyResult
1210
1311from anyvlm .anyvar .base_client import BaseAnyVarClient
14- from anyvlm .schemas .domain import AlleleFrequencyAnnotation
1512
1613_logger = logging .getLogger (__name__ )
1714
1815
19- _Var_Af_Pair = tuple [ VrsVariation , AlleleFrequencyAnnotation ]
16+ AfData = namedtuple ( "AfData" , ( "ac" , "an" , "ac_het" , "ac_hom" , "ac_hemi" ))
2017
2118
22- def _yield_var_af_batches (
23- vcf : pysam .VariantFile ,
24- translator : AlleleTranslator ,
25- assembly : str ,
26- batch_size : int = 1000 ,
27- ) -> Iterator [_Var_Af_Pair ]:
28- """Generate a variant-allele frequency data pairing, one at a time
19+ def _yield_expression_af_batches (
20+ vcf : pysam .VariantFile , batch_size : int = 1000
21+ ) -> Iterator [list [tuple [str , CohortAlleleFrequencyStudyResult ]]]:
22+ """Generate a variant expression-allele frequency data pairing, one at a time
2923
3024 :param vcf: VCF to pull variants from
3125 :param translator: VRS-Python variant translator for converting VCF expressions to VRS
3226 :param assembly: name of reference assembly used by VCF
3327 :param batch_size: size of return batches
28+ :return: iterator of lists of pairs of variant expressions and AF data classes
3429 """
35- batch : list [ _Var_Af_Pair ] = []
30+ batch = []
3631
3732 for record in vcf :
3833 for i , alt in enumerate (record .alts or []):
3934 if record .ref is None or "*" in record .ref or "*" in alt :
4035 _logger .warning ("Skipping missing allele at %s" , record )
4136 continue
4237 expression = f"{ record .chrom } -{ record .pos } -{ record .ref } -{ alt } "
43- vrs_variation = translator .translate_from (
44- expression , "gnomad" , assembly_name = assembly
45- )
46- af = AlleleFrequencyAnnotation (
38+ af = AfData (
4739 ac = record .info ["AC" ][i ],
4840 an = record .info ["AN" ],
4941 ac_het = record .info ["AC_Het" ][i ],
5042 ac_hom = record .info ["AC_Hom" ][i ],
5143 ac_hemi = record .info ["AC_Hemi" ][i ],
5244 )
53- batch .append ((vrs_variation , af ))
45+ batch .append ((expression , af ))
5446 if len (batch ) >= batch_size :
5547 yield batch
5648 batch = []
@@ -74,14 +66,12 @@ def ingest_vcf(vcf_path: Path, av: BaseAnyVarClient, assembly: str = "GRCh38") -
7466 :param av: AnyVar client
7567 :param assembly: reference assembly used by VCF
7668 """
77- dataproxy = create_dataproxy (
78- os .environ .get ("SEQREPO_DATAPROXY_URI" , "seqrepo+http://localhost:5000/seqrepo" )
79- )
80- translator = AlleleTranslator (dataproxy )
8169 vcf = pysam .VariantFile (filename = vcf_path .absolute ().as_uri (), mode = "r" )
8270
83- for batch in _yield_var_af_batches (vcf , translator , assembly ):
84- variants = [v for v , _ in batch ]
85- av .put_objects (variants )
86- for variant , af in batch : # noqa: B007
87- pass # make a call to a storage class to store frequency data -- see issue 23
71+ for batch in _yield_expression_af_batches (vcf ):
72+ expressions , afs = zip (* batch , strict = True )
73+ variant_ids = av .put_allele_expressions (expressions , assembly )
74+ for variant_id , af in zip (variant_ids , afs , strict = True ): # noqa: B007
75+ if variant_id is None :
76+ continue
77+ # put af object here
0 commit comments