77from sqlalchemy .orm import Session
88
99from mavedb .lib .logging .context import logging_context , save_to_logging_context
10+ from mavedb .lib .utils import batched
1011from mavedb .db .athena import engine as athena_engine
1112from mavedb .models .gnomad_variant import GnomADVariant
1213from mavedb .models .mapped_variant import MappedVariant
@@ -67,7 +68,9 @@ def allele_list_from_list_like_string(alleles_string: str) -> list[str]:
6768
6869def gnomad_variant_data_for_caids (caids : Sequence [str ]) -> Sequence [Row [Any ]]: # pragma: no cover
6970 """
70- Fetches variant rows from the gnomAD table for a list of CAIDs.
71+ Fetches variant rows from the gnomAD table for a list of CAIDs. Athena has a maximum character limit of 262144
72+ in queries. CAIDs are about 12 characters long on average + 4 for two quotes, a comma and a space. Chunk our list
73+ into chunks of 260000/16=16250 so we are guaranteed to remain under the character limit.
7174
7275 Args:
7376 caids (list[str]): A list of CAIDs (Canonical Allele Identifiers) to query.
@@ -87,36 +90,45 @@ def gnomad_variant_data_for_caids(caids: Sequence[str]) -> Sequence[Row[Any]]:
8790 Raises:
8891 sqlalchemy.exc.SQLAlchemyError: If there is an error executing the query.
8992 """
90-
91- caid_str = "," .join (f"'{ caid } '" for caid in caids )
92- athena_query = f"""
93- SELECT
94- "locus.contig",
95- "locus.position",
96- "alleles",
97- "caid",
98- "joint.freq.all.ac",
99- "joint.freq.all.an",
100- "joint.fafmax.faf95_max_gen_anc",
101- "joint.fafmax.faf95_max"
102- FROM
103- { gnomad_table_name ()}
104- WHERE
105- caid IN ({ caid_str } )
106- """
107-
108- save_to_logging_context ({"num_caids" : len (caids )})
109- logger .debug (msg = f"Fetching gnomAD variants from Athena with query:\n { athena_query } " , extra = logging_context ())
93+ chunked_caids = batched (caids , 16250 )
94+ caid_strs = ["," .join (f"'{ caid } '" for caid in chunk ) for chunk in chunked_caids ]
95+ save_to_logging_context ({"num_caids" : len (caids ), "num_chunks" : len (caid_strs )})
11096
11197 with athena_engine .connect () as athena_connection :
11298 logger .debug (msg = "Connected to Athena" , extra = logging_context ())
113- result = athena_connection .execute (text (athena_query ))
114- rows = result .fetchall ()
11599
116- save_to_logging_context ({"num_gnomad_variant_rows_fetched" : len (rows )})
117- logger .debug (msg = "Done fetching gnomAD variants from Athena" , extra = logging_context ())
100+ result_rows : list [Row [Any ]] = []
101+ for chunk_index , caid_str in enumerate (caid_strs ):
102+ athena_query = f"""
103+ SELECT
104+ "locus.contig",
105+ "locus.position",
106+ "alleles",
107+ "caid",
108+ "joint.freq.all.ac",
109+ "joint.freq.all.an",
110+ "joint.fafmax.faf95_max_gen_anc",
111+ "joint.fafmax.faf95_max"
112+ FROM
113+ { gnomad_table_name ()}
114+ WHERE
115+ caid IN ({ caid_str } )
116+ """
117+ logger .debug (
118+ msg = f"Fetching gnomAD variants from Athena (batch { chunk_index } ) with query:\n { athena_query } " ,
119+ extra = logging_context (),
120+ )
121+
122+ result = athena_connection .execute (text (athena_query ))
123+ rows = result .fetchall ()
124+ result_rows .extend (rows )
125+
126+ logger .debug (f"Fetched { len (rows )} gnomAD variants from Athena (batch { chunk_index } )." )
127+
128+ save_to_logging_context ({"num_gnomad_variant_rows_fetched" : len (result_rows )})
129+ logger .debug (msg = "Done fetching gnomAD variants from Athena" , extra = logging_context ())
118130
119- return rows
131+ return result_rows
120132
121133
122134def link_gnomad_variants_to_mapped_variants (
0 commit comments