|
2 | 2 | import logging |
3 | 3 | import re |
4 | 4 |
|
| 5 | +from Bio import Align |
5 | 6 | from Bio.Data.CodonTable import IUPACData |
6 | 7 | from Bio.Seq import Seq |
7 | 8 | from Bio.SeqUtils import seq1 |
|
10 | 11 | from dcd_mapping.exceptions import TxSelectError |
11 | 12 | from dcd_mapping.lookup import ( |
12 | 13 | get_chromosome_identifier, |
13 | | - get_gene_symbol, |
14 | 14 | get_mane_transcripts, |
15 | 15 | get_protein_accession, |
16 | 16 | get_seqrepo, |
|
36 | 36 |
|
37 | 37 |
|
38 | 38 | async def _get_compatible_transcripts( |
39 | | - target_gene: TargetGene, align_result: AlignmentResult |
40 | | -) -> set[str]: |
41 | | - """Acquire transcripts which overlap with all hit subranges |
| 39 | + align_result: AlignmentResult, |
| 40 | +) -> set[tuple[str, str]]: |
| 41 | + """Acquire transcripts and their HGNC symbols which overlap with all hit subranges |
42 | 42 | of an alignment result. |
43 | 43 |
|
44 | 44 | :param metadata: metadata for scoreset |
45 | 45 | :param align_result: output of ``align()`` method |
46 | 46 | :return: Set of compatible transcripts |
47 | 47 | """ |
48 | | - if align_result.chrom.startswith("chr"): |
49 | | - aligned_chrom = align_result.chrom[3:] |
50 | | - else: |
51 | | - aligned_chrom = align_result.chrom |
| 48 | + aligned_chrom = ( |
| 49 | + align_result.chrom[3:] |
| 50 | + if align_result.chrom.startswith("chr") |
| 51 | + else align_result.chrom |
| 52 | + ) |
52 | 53 | chromosome = get_chromosome_identifier(aligned_chrom) |
53 | | - gene_symbol = get_gene_symbol(target_gene) |
54 | | - if not gene_symbol: |
55 | | - msg = ( |
56 | | - f"Unable to find gene symbol for target gene {target_gene.target_gene_name}" |
57 | | - ) |
58 | | - raise TxSelectError(msg) |
59 | | - transcript_matches: set[str] = set() |
| 54 | + |
| 55 | + transcript_matches: set[tuple[str, str]] = set() |
60 | 56 | for hit_range in align_result.hit_subranges: |
61 | | - matches_list = await get_transcripts( |
62 | | - gene_symbol, chromosome, hit_range.start, hit_range.end |
63 | | - ) |
| 57 | + matches_list = await get_transcripts(chromosome, hit_range.start, hit_range.end) |
| 58 | + if not transcript_matches: |
| 59 | + transcript_matches = set(matches_list) |
| 60 | + |
64 | 61 | transcript_matches.intersection_update(matches_list) |
| 62 | + |
65 | 63 | return transcript_matches |
66 | 64 |
|
67 | 65 |
|
| 66 | +def _local_alignment_identity(query: str, ref: str) -> float: |
| 67 | + """Compute local alignment percent identity between two protein sequences. |
| 68 | +
|
| 69 | + Uses Smith-Waterman local alignment with BLOSUM62; gap open -10, gap extend -0.5. |
| 70 | + Returns alignment score, or -1000 if either sequence is empty. |
| 71 | + """ |
| 72 | + if not query or not ref: |
| 73 | + return -1000 |
| 74 | + |
| 75 | + aligner = Align.PairwiseAligner( |
| 76 | + mode="local", |
| 77 | + substitution_matrix=Align.substitution_matrices.load("BLOSUM62"), |
| 78 | + open_gap_score=-10, |
| 79 | + extend_gap_score=-0.5, |
| 80 | + ) |
| 81 | + |
| 82 | + try: |
| 83 | + alignments = aligner.align(query, ref) |
| 84 | + except Exception as e: |
| 85 | + # Do not fallback to approximate similarity; propagate failure |
| 86 | + msg = f"Local alignment failed: {e}" |
| 87 | + raise TxSelectError(msg) from e |
| 88 | + |
| 89 | + if not alignments: |
| 90 | + return -1000 |
| 91 | + |
| 92 | + return alignments[0].score |
| 93 | + |
| 94 | + |
| 95 | +def _choose_most_similar_transcript( |
| 96 | + protein_sequence: str, mane_transcripts: list[TranscriptDescription] |
| 97 | +) -> TranscriptDescription | None: |
| 98 | + """Choose the transcript whose protein reference is most similar to the |
| 99 | + provided sequence. |
| 100 | +
|
| 101 | + Selects the highest similarity; ties keep first encountered (stable). |
| 102 | + """ |
| 103 | + if not mane_transcripts: |
| 104 | + return None |
| 105 | + if len(mane_transcripts) == 1: |
| 106 | + return mane_transcripts[0] |
| 107 | + |
| 108 | + best: TranscriptDescription | None = None |
| 109 | + best_score = -1.0 |
| 110 | + for tx in mane_transcripts: |
| 111 | + ref_seq = get_sequence(tx.refseq_prot) |
| 112 | + score = _local_alignment_identity(protein_sequence, ref_seq) |
| 113 | + if score > best_score: |
| 114 | + best_score = score |
| 115 | + best = tx |
| 116 | + |
| 117 | + return best |
| 118 | + |
| 119 | + |
68 | 120 | def _choose_best_mane_transcript( |
69 | 121 | mane_transcripts: list[ManeDescription], |
70 | 122 | ) -> ManeDescription | None: |
@@ -143,46 +195,77 @@ async def _select_protein_reference( |
143 | 195 | :raise TxSelectError: if no matching MANE transcripts and unable to get UniProt ID/ |
144 | 196 | reference sequence |
145 | 197 | """ |
146 | | - matching_transcripts = await _get_compatible_transcripts(target_gene, align_result) |
| 198 | + matching_transcripts = await _get_compatible_transcripts(align_result) |
147 | 199 |
|
148 | | - if not matching_transcripts: |
149 | | - if not target_gene.target_uniprot_ref: |
150 | | - msg = f"Unable to find matching transcripts for target gene {target_gene.target_gene_name}" |
151 | | - raise TxSelectError(msg) |
152 | | - protein_sequence = get_uniprot_sequence(target_gene.target_uniprot_ref.id) |
153 | | - np_accession = target_gene.target_uniprot_ref.id |
154 | | - ref_sequence = get_uniprot_sequence(target_gene.target_uniprot_ref.id) |
155 | | - if not ref_sequence: |
156 | | - msg = f"Unable to grab reference sequence from uniprot.org for target gene {target_gene.target_gene_name}" |
157 | | - raise TxSelectError(msg) |
158 | | - nm_accession = None |
159 | | - tx_mode = None |
160 | | - else: |
161 | | - mane_transcripts = get_mane_transcripts(matching_transcripts) |
| 200 | + # Map HGNC symbols to their compatible transcripts |
| 201 | + hgnc_to_transcripts: dict[str, list[str]] = {} |
| 202 | + for tx, hgnc in matching_transcripts: |
| 203 | + hgnc_to_transcripts.setdefault(hgnc, []).append(tx) |
| 204 | + |
| 205 | + per_gene_best: list[ManeDescription | TranscriptDescription] = [] |
| 206 | + best_tx: ManeDescription | TranscriptDescription | None = None |
| 207 | + |
| 208 | + # Choose one best transcript per gene (based on MANE priority, falling back to longest) |
| 209 | + for _, transcripts in hgnc_to_transcripts.items(): |
| 210 | + if not transcripts: |
| 211 | + continue |
| 212 | + |
| 213 | + mane_transcripts = get_mane_transcripts(transcripts) |
162 | 214 | best_tx = _choose_best_mane_transcript(mane_transcripts) |
| 215 | + |
163 | 216 | if not best_tx: |
164 | | - best_tx = await _get_longest_compatible_transcript( |
165 | | - list(matching_transcripts) |
166 | | - ) |
167 | | - if not best_tx: |
168 | | - msg = f"Unable to find matching MANE transcripts for target gene {target_gene.target_gene_name}" |
| 217 | + best_tx = await _get_longest_compatible_transcript(transcripts) |
| 218 | + |
| 219 | + if best_tx: |
| 220 | + per_gene_best.append(best_tx) |
| 221 | + |
| 222 | + # If we found any per-gene best candidates, Step 2: choose the most similar among them and |
| 223 | + # select it. |
| 224 | + if per_gene_best: |
| 225 | + if not target_gene.target_sequence: |
| 226 | + msg = f"Unable to find target sequence for target gene {target_gene.target_gene_name}" |
169 | 227 | raise TxSelectError(msg) |
| 228 | + |
| 229 | + protein_sequence = _get_protein_sequence(target_gene.target_sequence) |
| 230 | + best_tx = _choose_most_similar_transcript(protein_sequence, per_gene_best) |
| 231 | + |
| 232 | + # As a fallback, pick the first candidate |
| 233 | + if not best_tx: |
| 234 | + best_tx = per_gene_best[0] |
| 235 | + |
170 | 236 | ref_sequence = get_sequence(best_tx.refseq_prot) |
171 | | - nm_accession = best_tx.refseq_nuc |
172 | | - np_accession = best_tx.refseq_prot |
173 | | - tx_mode = best_tx.transcript_priority |
| 237 | + is_full_match = ref_sequence.find(protein_sequence) != -1 |
| 238 | + start = ref_sequence.find(protein_sequence[:10]) |
| 239 | + |
| 240 | + return TxSelectResult( |
| 241 | + nm=best_tx.refseq_nuc, |
| 242 | + np=best_tx.refseq_prot, |
| 243 | + start=start, |
| 244 | + is_full_match=is_full_match, |
| 245 | + sequence=get_sequence(best_tx.refseq_prot), |
| 246 | + transcript_mode=best_tx.transcript_priority, |
| 247 | + ) |
174 | 248 |
|
175 | | - protein_sequence = _get_protein_sequence(target_gene.target_sequence) |
176 | | - is_full_match = ref_sequence.find(protein_sequence) != -1 |
177 | | - start = ref_sequence.find(protein_sequence[:10]) |
| 249 | + # If we didn't find any suitable transcript, attempt to use a provided UniProt reference |
| 250 | + if not target_gene.target_uniprot_ref: |
| 251 | + msg = f"Unable to find matching transcripts for target gene {target_gene.target_gene_name}" |
| 252 | + raise TxSelectError(msg) |
| 253 | + |
| 254 | + uniprot_sequence = get_uniprot_sequence(target_gene.target_uniprot_ref.id) |
| 255 | + if not uniprot_sequence: |
| 256 | + msg = f"Unable to grab reference sequence from uniprot.org for target gene {target_gene.target_gene_name}" |
| 257 | + raise TxSelectError(msg) |
| 258 | + |
| 259 | + is_full_match = uniprot_sequence.find(protein_sequence) != -1 |
| 260 | + start = uniprot_sequence.find(protein_sequence[:10]) |
178 | 261 |
|
179 | 262 | return TxSelectResult( |
180 | | - nm=nm_accession, |
181 | | - np=np_accession, |
| 263 | + nm=None, |
| 264 | + np=target_gene.target_uniprot_ref.id, |
182 | 265 | start=start, |
183 | 266 | is_full_match=is_full_match, |
184 | 267 | sequence=protein_sequence, |
185 | | - transcript_mode=tx_mode, |
| 268 | + transcript_mode=None, |
186 | 269 | ) |
187 | 270 |
|
188 | 271 |
|
|
0 commit comments