Skip to content

Commit 44700dc

Browse files
committed
Working commit for the cache seeding infrastructure
1 parent 9651f26 commit 44700dc

File tree

1 file changed

+392
-0
lines changed

1 file changed

+392
-0
lines changed
Lines changed: 392 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,392 @@
1+
import argparse
2+
import asyncio
3+
import pathlib
4+
import sqlite3
5+
from typing import AsyncGenerator, Callable, Union
6+
7+
import elasticsearch
8+
from elasticsearch import helpers
9+
10+
import biothings_client
11+
12+
import biothings_annotator
13+
from biothings_client.client.asynchronous import AsyncBiothingClient
14+
15+
16+
async def lookup_gene_curie_id(client: AsyncBiothingClient, connection: sqlite3.Connection) -> None:
17+
print("Looking up all gene CURIE identifiers")
18+
19+
async def gene_curie_builder(client: AsyncBiothingClient) -> AsyncGenerator[str, None]:
20+
gene_generator = await client.query(
21+
"__all__", fields=["entrezgene", "ensembl.gene", "uniprot.Swiss-Prot"], fetch_all=True
22+
)
23+
24+
async for gene_result in gene_generator:
25+
if gene_result.get("entrezgene", None) is not None:
26+
yield f"NCBIGene:{gene_result['entrezgene']}"
27+
elif gene_result.get("ensembl", None) is not None:
28+
ensembl = gene_result["ensembl"]
29+
if isinstance(ensembl, dict):
30+
if ensembl.get("cid", None) is not None:
31+
yield f"ENSEMBL:{ensembl['gene']}"
32+
elif isinstance(ensembl, list):
33+
for ensembl_entry in ensembl:
34+
if ensembl_entry.get("gene", None) is not None:
35+
yield f"ENSEMBL:{ensembl_entry['gene']}"
36+
elif gene_result.get("uniprot", None) is not None:
37+
uniprot = gene_result["uniprot"]
38+
if uniprot.get("Swiss-Prot", None):
39+
yield f"UniProtKB:{uniprot['Swiss-Prot']}"
40+
41+
await _handle_curie_storage("gene", "gene_curie", gene_curie_builder, client, connection)
42+
43+
44+
async def lookup_chem_curie_id(client: AsyncBiothingClient, connection: sqlite3.Connection) -> None:
45+
print("Looking up all chem CURIE identifiers")
46+
47+
async def chem_curie_builder(client: AsyncBiothingClient) -> AsyncGenerator[str, None]:
48+
chem_generator = await client.query(
49+
"__all__",
50+
fields=["pubchem.cid", "chebi.id", "chembl.molecule_chembl_id", "inchikey", "unii.unii"],
51+
fetch_all=True,
52+
)
53+
async for chem_result in chem_generator:
54+
if chem_result.get("pubchem", None) is not None:
55+
pubchem = chem_result["pubchem"]
56+
if isinstance(pubchem, dict):
57+
if pubchem.get("cid", None) is not None:
58+
yield f"PUBCHEM.COMPOUND:{pubchem['cid']}"
59+
elif isinstance(pubchem, list):
60+
for pub_entry in pubchem:
61+
if pub_entry.get("cid", None) is not None:
62+
yield f"PUBCHEM.COMPOUND:{pub_entry['cid']}"
63+
64+
elif chem_result.get("chebi", None) is not None:
65+
chebi = chem_result["chebi"]
66+
if isinstance(chebi, dict):
67+
if chebi.get("id", None) is not None:
68+
yield f"CHEBI:{chebi['id']}"
69+
elif isinstance(chebi, list):
70+
for chebi_entry in chebi:
71+
if chebi_entry.get("id", None) is not None:
72+
yield f"CHEBI:{chebi_entry['id']}"
73+
74+
elif chem_result.get("chembl", None) is not None:
75+
chembl = chem_result["chembl"]
76+
if isinstance(chembl, dict):
77+
if chembl.get("molecule_chembl_id", None) is not None:
78+
yield f"CHEMBL.COMPOUND:{chembl['molecule_chembl_id']}"
79+
elif isinstance(chembl, list):
80+
for chembl_entry in chembl:
81+
if chembl_entry.get("molecule_chembl_id", None) is not None:
82+
yield f"CHEMBL.COMPOUND:{chembl_entry['molecule_chembl_id']}"
83+
84+
elif chem_result.get("inchikey", None) is not None:
85+
inchikey = chem_result["inchikey"]
86+
yield f"INCHIKEY:{inchikey}"
87+
88+
elif chem_result.get("unii", None) is not None:
89+
unii = chem_result["unii"]
90+
if isinstance(unii, dict):
91+
if unii.get("unii", None) is not None:
92+
yield f"UNII:{unii['unii']}"
93+
elif isinstance(unii, list):
94+
for unii_entry in unii:
95+
if unii_entry.get("unii", None) is not None:
96+
yield f"UNII:{unii_entry['unii']}"
97+
98+
await _handle_curie_storage("chem", "chem_curie", chem_curie_builder, client, connection)
99+
100+
101+
async def lookup_disease_curie_id(client: AsyncBiothingClient, connection: sqlite3.Connection) -> None:
102+
print("Looking up all disease CURIE identifiers")
103+
104+
async def disease_curie_builder(disease_client: AsyncBiothingClient):
105+
disease_generator = await disease_client.query(
106+
"__all__", fields=["disease_ontology.xrefs", "disgenet.xrefs", "mondo.xrefs"], fetch_all=True
107+
)
108+
async for disease_result in disease_generator:
109+
if disease_result.get("disgenet", None) is not None:
110+
disgenet = disease_result["disgenet"]
111+
112+
if disgenet.get("xrefs", None) is not None:
113+
xrefs = disgenet.get("xrefs", None)
114+
if xrefs.get("mondo", None) is not None:
115+
yield xrefs["mondo"]
116+
elif xrefs.get("doid", None) is not None:
117+
yield xrefs["doid"]
118+
elif xrefs.get("hp", None) is not None:
119+
yield xrefs["hp"]
120+
elif disease_result.get("disease_ontology", None) is not None:
121+
disease_ontology = disease_result["disease_ontology"]
122+
if disease_ontology.get("xrefs", None) is not None:
123+
xrefs = disease_ontology.get("xrefs", None)
124+
if xrefs.get("mondo", None) is not None:
125+
yield xrefs["mondo"]
126+
elif xrefs.get("doid", None) is not None:
127+
yield xrefs["doid"]
128+
elif xrefs.get("hp", None) is not None:
129+
yield xrefs["hp"]
130+
elif disease_result.get("mondo", None) is not None:
131+
mondo = disease_result["mondo"]
132+
if mondo.get("xrefs", None) is not None:
133+
xrefs = mondo.get("xrefs", None)
134+
if xrefs.get("mondo", None) is not None:
135+
yield xrefs["mondo"]
136+
elif xrefs.get("doid", None) is not None:
137+
if isinstance(xrefs["doid"], list):
138+
for doid_reference in xrefs["doid"]:
139+
yield doid_reference
140+
else:
141+
yield xrefs["doid"]
142+
elif xrefs.get("hp", None) is not None:
143+
if isinstance(xrefs["hp"], list):
144+
for hp_reference in xrefs["hp"]:
145+
yield hp_reference
146+
else:
147+
yield xrefs["hp"]
148+
149+
await _handle_curie_storage("disease", "disease_curie", disease_curie_builder, client, connection)
150+
151+
152+
async def _handle_curie_storage(
153+
data_identifier: str,
154+
database_table: str,
155+
builder: Callable,
156+
client: AsyncBiothingClient,
157+
connection: sqlite3.Connection,
158+
) -> None:
159+
"""
160+
Handles the results by calling the asynchronous generator produced
161+
by the `fetch_all` call for the biothings-client
162+
163+
Iterates over the data and periodically updating the sqlite3 table
164+
"""
165+
curie_storage = []
166+
curie_batch = 0
167+
async for curie_entry in builder(client):
168+
curie_storage.append((curie_entry,))
169+
if len(curie_storage) >= 10000:
170+
connection.executemany(f"INSERT into {database_table}(curie) values (?)", curie_storage)
171+
print(f"{data_identifier} batch #{curie_batch} completed | size: {len(curie_storage)}")
172+
curie_batch += 1
173+
curie_storage = []
174+
175+
if curie_batch % 100 == 0:
176+
connection.commit()
177+
print(f"curie {data_identifier} database interim commit #{int(curie_batch/100)}")
178+
179+
if len(curie_storage) > 0:
180+
connection.executemany(f"INSERT into {database_table}(curie) values (?)", curie_storage)
181+
print(f"final {data_identifier} batch competed | size: {len(curie_storage)}")
182+
183+
connection.commit()
184+
connection.close()
185+
186+
187+
async def bulk_generate_curie_id(gene_filter: bool = False, chem_filter: bool = False, disease_filter: bool = False):
188+
filter_functions = []
189+
if gene_filter:
190+
gene_connection = sqlite3.connect("gene_curie.db")
191+
gene_table_command = "CREATE TABLE IF NOT EXISTS gene_curie (id INTEGER PRIMARY KEY, curie TEXT);"
192+
gene_connection.execute(gene_table_command)
193+
gene_client = biothings_client.get_async_client("gene")
194+
195+
filter_arguments = {"client": gene_client, "connection": gene_connection}
196+
filter_functions.append(lookup_gene_curie_id(**filter_arguments))
197+
198+
if chem_filter:
199+
chem_connection = sqlite3.connect("chem_curie.db")
200+
chem_table_command = "CREATE TABLE IF NOT EXISTS chem_curie (id INTEGER PRIMARY KEY, curie TEXT);"
201+
chem_connection.execute(chem_table_command)
202+
chem_client = biothings_client.get_async_client("chem")
203+
204+
filter_arguments = {"client": chem_client, "connection": chem_connection}
205+
filter_functions.append(lookup_chem_curie_id(**filter_arguments))
206+
207+
if disease_filter:
208+
disease_connection = sqlite3.connect("disease_curie.db")
209+
disease_table_command = "CREATE TABLE IF NOT EXISTS disease_curie (id INTEGER PRIMARY KEY, curie TEXT);"
210+
disease_connection.execute(disease_table_command)
211+
disease_client = biothings_client.get_async_client("disease")
212+
213+
filter_arguments = {"client": disease_client, "connection": disease_connection}
214+
filter_functions.append(lookup_disease_curie_id(**filter_arguments))
215+
216+
if len(filter_functions) > 0:
217+
await asyncio.gather(*filter_functions)
218+
219+
220+
async def generate_index(client: elasticsearch.AsyncElasticsearch, index_name: str) -> None:
221+
index_configuration = {
222+
"settings": {
223+
"index": {
224+
"number_of_shards": 1,
225+
"number_of_replicas": 0,
226+
"mapping": {"total_fields": {"limit": 3000, "ignore_dynamic_beyond_limit": True}},
227+
},
228+
"query": {"default_field": "_id,all"},
229+
"codec": "best_compression",
230+
"analysis": {
231+
"analyzer": {
232+
# soon deprecated in favor of keyword_lowercase_normalizer
233+
"string_lowercase": {
234+
"tokenizer": "keyword",
235+
"filter": "lowercase",
236+
},
237+
"whitespace_lowercase": {
238+
"tokenizer": "whitespace",
239+
"filter": "lowercase",
240+
},
241+
},
242+
"normalizer": {
243+
"keyword_lowercase_normalizer": {
244+
"filter": ["lowercase"],
245+
"type": "custom",
246+
"char_filter": [],
247+
},
248+
},
249+
},
250+
},
251+
"mappings": {
252+
"dynamic": "true",
253+
},
254+
}
255+
256+
if not (await client.indices.exists(index=index_name)):
257+
print(f"Creating index: {index_name}")
258+
await client.indices.create(index=index_name, body=index_configuration)
259+
260+
261+
async def bulk_generate_index() -> None:
262+
client = elasticsearch.AsyncElasticsearch([{"host": "localhost", "port": 9200, "scheme": "http"}])
263+
await generate_index(client=client, index_name="gene-annotator-cache")
264+
await generate_index(client=client, index_name="chem-annotator-cache")
265+
await generate_index(client=client, index_name="disease-annotator-cache")
266+
267+
268+
async def reset_indices() -> None:
269+
client = elasticsearch.AsyncElasticsearch([{"host": "localhost", "port": 9200, "scheme": "http"}])
270+
await client.options(ignore_status=[400, 404]).indices.delete(index="gene-annotator-cache")
271+
await client.options(ignore_status=[400, 404]).indices.delete(index="chem-annotator-cache")
272+
await client.options(ignore_status=[400, 404]).indices.delete(index="disease-annotator-cache")
273+
274+
275+
async def seed_cache_index(
276+
client: elasticsearch.AsyncElasticsearch,
277+
index_name: str,
278+
database_file: Union[str, pathlib.Path],
279+
database_table: str,
280+
fields: list[str],
281+
) -> None:
282+
283+
async def curie_database_generator(database: str, table: str) -> list:
284+
connection = sqlite3.connect(database)
285+
cursor = connection.cursor()
286+
chunk_size = 10000
287+
offset = 0
288+
289+
while True:
290+
query = f"SELECT curie FROM {table} LIMIT {chunk_size} OFFSET {offset}"
291+
cursor.execute(query)
292+
curie_id = [curie[0].strip() for curie in cursor.fetchall()]
293+
294+
if curie_id != []:
295+
annotated_documents = await annotator.annotate_curie_list(
296+
curie_list=curie_id, fields=fields, include_extra=True
297+
)
298+
299+
for key, documents in annotated_documents.items():
300+
yield {key: documents}
301+
302+
offset += chunk_size
303+
print(f"Batch offset: {offset}")
304+
else:
305+
return
306+
307+
annotator = biothings_annotator.annotator.Annotator()
308+
if await client.indices.exists(index=index_name):
309+
async for document in curie_database_generator(database_file, database_table):
310+
await client.index(index=index_name, document=document)
311+
312+
313+
async def bulk_populate_index(
314+
gene_filter: bool = False, chem_filter: bool = False, disease_filter: bool = False
315+
) -> None:
316+
client = elasticsearch.AsyncElasticsearch([{"host": "localhost", "port": 9200, "scheme": "http"}])
317+
318+
seeding_functions = []
319+
if gene_filter:
320+
gene_fields = biothings_annotator.annotator.settings.ANNOTATOR_CLIENTS["gene"]["fields"]
321+
gene_seeding = seed_cache_index(
322+
client=client,
323+
index_name="gene-annotator-cache",
324+
database_file="gene_curie.db",
325+
database_table="gene_curie",
326+
fields=gene_fields,
327+
)
328+
seeding_functions.append(gene_seeding)
329+
330+
if chem_filter:
331+
chem_fields = biothings_annotator.annotator.settings.ANNOTATOR_CLIENTS["chem"]["fields"]
332+
chem_seeding = seed_cache_index(
333+
client=client,
334+
index_name="chem-annotator-cache",
335+
database_file="chem_curie.db",
336+
database_table="chem_curie",
337+
fields=chem_fields,
338+
)
339+
seeding_functions.append(chem_seeding)
340+
341+
if disease_filter:
342+
disease_fields = biothings_annotator.annotator.settings.ANNOTATOR_CLIENTS["disease"]["fields"]
343+
disease_seeding = seed_cache_index(
344+
client=client,
345+
index_name="disease-annotator-cache",
346+
database_file="disease_curie.db",
347+
database_table="disease_curie",
348+
fields=disease_fields,
349+
)
350+
seeding_functions.append(disease_seeding)
351+
352+
if len(seeding_functions) > 0:
353+
await asyncio.gather(*seeding_functions)
354+
355+
356+
def command_parsing() -> argparse.Namespace:
357+
parser = argparse.ArgumentParser()
358+
group = parser.add_mutually_exclusive_group()
359+
group.add_argument("--generate-index", dest="genindex", action=argparse.BooleanOptionalAction)
360+
group.add_argument("--reset-index", dest="resetindex", action=argparse.BooleanOptionalAction)
361+
362+
subparser = parser.add_subparsers(dest="subcommand")
363+
filter_id_parser = subparser.add_parser("generate-id")
364+
filter_id_parser.add_argument("--geneid", action=argparse.BooleanOptionalAction)
365+
filter_id_parser.add_argument("--diseaseid", action=argparse.BooleanOptionalAction)
366+
filter_id_parser.add_argument("--chemid", action=argparse.BooleanOptionalAction)
367+
368+
fill_index_parser = subparser.add_parser("fill-index")
369+
fill_index_parser.add_argument("--geneid", action=argparse.BooleanOptionalAction)
370+
fill_index_parser.add_argument("--diseaseid", action=argparse.BooleanOptionalAction)
371+
fill_index_parser.add_argument("--chemid", action=argparse.BooleanOptionalAction)
372+
373+
args = parser.parse_args()
374+
return args
375+
376+
377+
def main():
378+
arguments = command_parsing()
379+
380+
if arguments.subcommand is not None:
381+
if arguments.subcommand == "generate-id":
382+
asyncio.run(bulk_generate_curie_id(arguments.geneid, arguments.chemid, arguments.diseaseid))
383+
elif arguments.subcommand == "fill-index":
384+
asyncio.run(bulk_populate_index(arguments.geneid, arguments.chemid, arguments.diseaseid))
385+
elif arguments.genindex:
386+
asyncio.run(bulk_generate_index())
387+
elif arguments.resetindex:
388+
asyncio.run(reset_indices())
389+
390+
391+
if __name__ == "__main__":
392+
main()

0 commit comments

Comments
 (0)