Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cool_seq_tool/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def __init__(
self.ex_g_coords_mapper = ExonGenomicCoordsMapper(
self.seqrepo_access,
self.uta_db,
self.mane_transcript,
self.mane_transcript_mappings,
self.liftover,
)
135 changes: 96 additions & 39 deletions src/cool_seq_tool/mappers/exon_genomic_coords.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

import logging

import polars as pl
from ga4gh.vrs.models import SequenceLocation, SequenceReference
from pydantic import ConfigDict, Field, StrictInt, StrictStr, model_validator

from cool_seq_tool.handlers.seqrepo_access import SeqRepoAccess
from cool_seq_tool.mappers.liftover import LiftOver
from cool_seq_tool.mappers.mane_transcript import ManeTranscript
from cool_seq_tool.schemas import (
AnnotationLayer,
Assembly,
BaseModelForbidExtra,
ServiceMeta,
Expand Down Expand Up @@ -232,6 +235,7 @@ def __init__(
self,
seqrepo_access: SeqRepoAccess,
uta_db: UtaDatabase,
mane_transcript: ManeTranscript,
mane_transcript_mappings: ManeTranscriptMappings,
liftover: LiftOver,
) -> None:
Expand All @@ -257,10 +261,12 @@ def __init__(
:param seqrepo_access: SeqRepo instance to give access to query SeqRepo database
:param uta_db: UtaDatabase instance to give access to query UTA database
:param mane_transcript_mappings: Instance to provide access to ManeTranscriptMappings class
:param mane_transcript: Instance to provide access to the ManeTranscript class
:param liftover: Instance to provide mapping between human genome assemblies
"""
self.seqrepo_access = seqrepo_access
self.uta_db = uta_db
self.mane_transcript = mane_transcript
self.mane_transcript_mappings = mane_transcript_mappings
self.liftover = liftover

Expand Down Expand Up @@ -411,6 +417,7 @@ async def genomic_to_tx_segment(
seg_end_genomic: int | None = None,
transcript: str | None = None,
get_nearest_transcript_junction: bool = False,
try_longest_compatible: bool = True,
gene: str | None = None,
) -> GenomicTxSegService:
"""Get transcript segment data for genomic data, lifted over to GRCh38.
Expand Down Expand Up @@ -456,6 +463,8 @@ async def genomic_to_tx_segment(
following the breakpoint for the 3' end. For the negative strand, adjacent
is defined as the exon following the breakpoint for the 5' end and the exon
preceding the breakpoint for the 3' end.
:param try_longest_compatible: ``True`` if should try longest compatible remaining
if mane transcript was not compatible. ``False`` otherwise.
:param gene: A valid, case-sensitive HGNC symbol. Must be given if no ``transcript``
value is provided.
:param coordinate_type: Coordinate type for ``seg_start_genomic`` and
Expand Down Expand Up @@ -484,6 +493,7 @@ async def genomic_to_tx_segment(
transcript=transcript,
gene=gene,
get_nearest_transcript_junction=get_nearest_transcript_junction,
try_longest_compatible=try_longest_compatible,
is_seg_start=True,
)
if start_tx_seg_data.errors:
Expand All @@ -504,6 +514,7 @@ async def genomic_to_tx_segment(
transcript=transcript,
gene=gene,
get_nearest_transcript_junction=get_nearest_transcript_junction,
try_longest_compatible=try_longest_compatible,
is_seg_start=False,
)
if end_tx_seg_data.errors:
Expand Down Expand Up @@ -734,6 +745,7 @@ async def _genomic_to_tx_segment(
transcript: str | None = None,
gene: str | None = None,
get_nearest_transcript_junction: bool = False,
try_longest_compatible: bool = True,
is_seg_start: bool = True,
) -> GenomicTxSeg:
"""Given genomic data, generate a boundary for a transcript segment.
Expand Down Expand Up @@ -761,6 +773,8 @@ async def _genomic_to_tx_segment(
following the breakpoint for the 3' end. For the negative strand, adjacent
is defined as the exon following the breakpoint for the 5' end and the exon
preceding the breakpoint for the 3' end.
:param try_longest_compatible: ``True`` if should try longest compatible remaining
if mane transcript was not compatible. ``False`` otherwise.
:param is_seg_start: ``True`` if ``genomic_pos`` is where the transcript segment starts.
``False`` if ``genomic_pos`` is where the transcript segment ends.
:return: Data for a transcript segment boundary (inter-residue coordinates)
Expand Down Expand Up @@ -796,37 +810,21 @@ async def _genomic_to_tx_segment(
mane_transcripts = self.mane_transcript_mappings.get_gene_mane_data(
gene
)

if mane_transcripts:
if mane_transcripts and await self.uta_db.validate_mane_transcript_ac(
mane_transcripts
):
transcript = mane_transcripts[0]["RefSeq_nuc"]
else:
# Attempt to find a coding transcript if a MANE transcript
# cannot be found
results = await self.uta_db.get_transcripts(
gene=gene, alt_ac=genomic_ac
)

if not results.is_empty():
transcript = results[0]["tx_ac"][0]
if try_longest_compatible:
transcript = await self._select_optimal_transcript(
genomic_pos, genomic_ac, gene
)
else:
# Run if gene is for a noncoding transcript
query = f"""
SELECT DISTINCT tx_ac
FROM {self.uta_db.schema}.tx_exon_aln_v
WHERE hgnc = '{gene}'
AND alt_ac = '{genomic_ac}'
""" # noqa: S608
result = await self.uta_db.execute_query(query)

if result:
transcript = result[0]["tx_ac"]
else:
return GenomicTxSeg(
errors=[
f"Could not find a transcript for {gene} on {genomic_ac}"
]
)

return GenomicTxSeg(
errors=[
"A MANE transcript either does not exist or was not found in UTA. Please set `try_longest_compatible` to ``True`` to re-run"
]
)
tx_exons = await self._get_all_exon_coords(
tx_ac=transcript, genomic_ac=genomic_ac
)
Expand Down Expand Up @@ -934,6 +932,57 @@ async def _genomic_to_tx_segment(
genomic_ac, genomic_pos, is_seg_start, gene, tx_ac=transcript
)

async def _select_optimal_transcript(
self, genomic_pos: int, genomic_ac: str, gene: str
) -> str:
"""Select the optimal transcript given a genomic position, accession, and gene.
In this case, optimal refers to the transcript that is the best represenative
transcript for this data, given that MANE data does not exist for the
gene or the MANE transcript for the gene does not exist in UTA

:param genomic_pos: Genomic position where the transcript segment starts or ends
(inter-residue based)
:param genomic_ac: Genomic accession
:param gene: Valid, case-sensitive HGNC gene symbol
:return A string representing the optimal transcript given the above data
"""
# Attempt to find a coding transcript if a MANE transcript cannot be found
transcript = None
results = await self.mane_transcript.get_longest_compatible_transcript(
start_pos=genomic_pos,
end_pos=genomic_pos,
start_annotation_layer=AnnotationLayer.GENOMIC,
gene=gene,
alt_ac=genomic_ac,
)
if results:
transcript = results.refseq
else:
# Run if gene is for a noncoding transcript
query = f"""
SELECT *
FROM {self.uta_db.schema}.tx_exon_aln_v
WHERE hgnc = '{gene}'
AND alt_ac = '{genomic_ac}'
""" # noqa: S608
results = await self.uta_db.execute_query(query)
schema = ["tx_ac"]
transcripts = [(r["tx_ac"]) for r in results]
transcripts = pl.DataFrame(
data=transcripts, schema=schema, orient="row"
).unique()
result = self.mane_transcript.get_prioritized_transcripts_from_gene(
transcripts
)

if result:
transcript = result[0]
else:
return GenomicTxSeg(
errors=[f"Could not find a transcript for {gene} on {genomic_ac}"]
)
return transcript

async def _get_grch38_ac_pos(
self, genomic_ac: str, genomic_pos: int, grch38_ac: str | None = None
) -> tuple[str | None, int | None, str | None]:
Expand Down Expand Up @@ -1065,24 +1114,32 @@ async def _get_tx_seg_genomic_metadata(
transcript
:return: Transcript segment data and associated genomic metadata
"""
if tx_ac:
# We should always try to liftover
grch38_ac = await self.uta_db.get_newest_assembly_ac(genomic_ac)
if not grch38_ac:
return GenomicTxSeg(errors=[f"Invalid genomic accession: {genomic_ac}"])
grch38_ac = grch38_ac[0]
else:
# We should always try to liftover
grch38_ac = await self.uta_db.get_newest_assembly_ac(genomic_ac)
if not grch38_ac:
return GenomicTxSeg(errors=[f"Invalid genomic accession: {genomic_ac}"])
grch38_ac = grch38_ac[0]

non_mane_transcript = None

if not tx_ac:
mane_data = self.mane_transcript_mappings.get_gene_mane_data(gene)
if not mane_data:
err_msg = f"Unable to find mane data for {genomic_ac} with position {genomic_pos}"
if gene:
err_msg += f" on gene {gene}"
_logger.warning(err_msg)
return GenomicTxSeg(errors=[err_msg])
if not await self.uta_db.validate_mane_transcript_ac(mane_data):
non_mane_transcript = await self._select_optimal_transcript(
genomic_pos, genomic_ac, gene
)

mane_data = mane_data[0]
tx_ac = mane_data["RefSeq_nuc"]
grch38_ac = mane_data["GRCh38_chr"]
if non_mane_transcript:
tx_ac = non_mane_transcript
else:
mane_data = mane_data[0]
tx_ac = mane_data["RefSeq_nuc"]
grch38_ac = mane_data["GRCh38_chr"]

# Always liftover to GRCh38
genomic_ac, genomic_pos, err_msg = await self._get_grch38_ac_pos(
Expand Down
17 changes: 9 additions & 8 deletions src/cool_seq_tool/mappers/mane_transcript.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ def validate_index(
)[0]
)

def _get_prioritized_transcripts_from_gene(self, df: pl.DataFrame) -> list:
def get_prioritized_transcripts_from_gene(self, df: pl.DataFrame) -> list:
"""Sort and filter transcripts from gene to get priority list

:param df: Data frame containing transcripts from gene
Expand All @@ -651,14 +651,12 @@ def _get_prioritized_transcripts_from_gene(self, df: pl.DataFrame) -> list:
most recent version of a transcript associated with an assembly will be kept
"""
copy_df = df.clone()
copy_df = copy_df.drop("alt_ac").unique()
if "alt_ac" in copy_df.columns:
copy_df = copy_df.drop("alt_ac").unique()
copy_df = copy_df.with_columns(
[
pl.col("tx_ac")
.str.split(".")
.list.get(0)
.str.split("NM_")
.list.get(1)
.str.extract(r"(?:NM_|NR_|NG_)(\d+)", 1)
.cast(pl.Int64)
.alias("ac_no_version_as_int"),
pl.col("tx_ac")
Expand All @@ -673,9 +671,12 @@ def _get_prioritized_transcripts_from_gene(self, df: pl.DataFrame) -> list:
)
copy_df = copy_df.unique(["ac_no_version_as_int"], keep="first")

tx_ac_index = copy_df.columns.index("tx_ac")
copy_df = copy_df.with_columns(
copy_df.map_rows(
lambda x: len(self.seqrepo_access.get_reference_sequence(x[1])[0])
lambda x: len(
self.seqrepo_access.get_reference_sequence(x[tx_ac_index])[0]
)
)
.to_series()
.alias("len_of_tx")
Expand Down Expand Up @@ -796,7 +797,7 @@ def _get_protein_rep(
_logger.warning("Unable to get transcripts from gene %s", gene)
return lcr_result

prioritized_tx_acs = self._get_prioritized_transcripts_from_gene(df)
prioritized_tx_acs = self.get_prioritized_transcripts_from_gene(df)

if mane_transcripts:
# Dont check MANE transcripts since we know that are not compatible
Expand Down
17 changes: 17 additions & 0 deletions src/cool_seq_tool/sources/uta_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,23 @@ async def validate_genomic_ac(self, ac: str) -> bool:
result = await self.execute_query(query)
return result[0][0]

async def validate_mane_transcript_ac(self, mane_transcripts: list[dict]) -> bool:
"""Return whether or not a MANE transcript exists in UTA. This looks at the
first entry in the MANE transcripts list as this is the higher priority
transcript.

:param transcript_list: A list of MANE transcripts
:return ``True`` if transcript accession exists in UTA. ``False`` otherwise
"""
transcript = mane_transcripts[0]["RefSeq_nuc"]
query = f"""
SELECT *
FROM {self.schema}.tx_exon_aln_v
WHERE tx_ac = '{transcript}'
""" # noqa: S608
results = await self.execute_query(query)
return bool(results)

async def get_ac_descr(self, ac: str) -> str | None:
"""Return accession description. This is typically available only for accessions
from older (pre-GRCh38) builds.
Expand Down
9 changes: 9 additions & 0 deletions tests/mappers/test_exon_genomic_coords.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,15 @@ async def test_genomic_to_transcript(test_egc_mapper, tpm3_exon1, tpm3_exon8):
)
genomic_tx_seg_checks(resp, tpm3_exon8)

# Check if transcript exists in UTA. If not, return longest compatible transcript
resp = await test_egc_mapper._genomic_to_tx_segment(
22513522,
genomic_ac="NC_000016.10",
is_seg_start=False,
gene="NPIPB5",
)
assert resp.tx_ac == "NM_001135865.2"


@pytest.mark.asyncio()
async def test_tpm3(
Expand Down
2 changes: 1 addition & 1 deletion tests/mappers/test_mane_transcript.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ def get_reference_sequence(ac):
data, schema=["pro_ac", "tx_ac", "alt_ac", "cds_start_i"], orient="row"
)

resp = test_mane_transcript._get_prioritized_transcripts_from_gene(test_df)
resp = test_mane_transcript.get_prioritized_transcripts_from_gene(test_df)
assert resp == ["NM_004333.6", "NM_001374258.2", "NM_001378472.1"]


Expand Down