|
1 | 1 | """Select best reference sequence.""" |
2 | 2 | import logging |
3 | 3 | import re |
| 4 | +from difflib import SequenceMatcher |
4 | 5 |
|
5 | 6 | from Bio.Data.CodonTable import IUPACData |
6 | 7 | from Bio.Seq import Seq |
|
10 | 11 | from dcd_mapping.exceptions import TxSelectError |
11 | 12 | from dcd_mapping.lookup import ( |
12 | 13 | get_chromosome_identifier, |
13 | | - get_gene_symbol, |
14 | 14 | get_mane_transcripts, |
15 | 15 | get_protein_accession, |
16 | 16 | get_seqrepo, |
|
36 | 36 |
|
37 | 37 |
|
38 | 38 | async def _get_compatible_transcripts( |
39 | | - target_gene: TargetGene, align_result: AlignmentResult |
40 | | -) -> set[str]: |
41 | | - """Acquire transcripts which overlap with all hit subranges |
| 39 | + align_result: AlignmentResult, |
| 40 | +) -> set[tuple[str, str]]: |
| 41 | + """Acquire transcripts and their HGNC symbols which overlap with all hit subranges |
42 | 42 | of an alignment result. |
43 | 43 |
|
44 | 44 | :param metadata: metadata for scoreset |
45 | 45 | :param align_result: output of ``align()`` method |
46 | 46 | :return: Set of compatible transcripts |
47 | 47 | """ |
48 | | - if align_result.chrom.startswith("chr"): |
49 | | - aligned_chrom = align_result.chrom[3:] |
50 | | - else: |
51 | | - aligned_chrom = align_result.chrom |
| 48 | + aligned_chrom = ( |
| 49 | + align_result.chrom[3:] |
| 50 | + if align_result.chrom.startswith("chr") |
| 51 | + else align_result.chrom |
| 52 | + ) |
52 | 53 | chromosome = get_chromosome_identifier(aligned_chrom) |
53 | | - gene_symbol = get_gene_symbol(target_gene) |
54 | | - if not gene_symbol: |
55 | | - msg = ( |
56 | | - f"Unable to find gene symbol for target gene {target_gene.target_gene_name}" |
57 | | - ) |
58 | | - raise TxSelectError(msg) |
59 | | - transcript_matches: set[str] = set() |
| 54 | + |
| 55 | + transcript_matches: set[tuple[str, str]] = set() |
60 | 56 | for hit_range in align_result.hit_subranges: |
61 | | - matches_list = await get_transcripts( |
62 | | - gene_symbol, chromosome, hit_range.start, hit_range.end |
63 | | - ) |
| 57 | + matches_list = await get_transcripts(chromosome, hit_range.start, hit_range.end) |
| 58 | + if not transcript_matches: |
| 59 | + transcript_matches = set(matches_list) |
| 60 | + |
64 | 61 | transcript_matches.intersection_update(matches_list) |
| 62 | + |
65 | 63 | return transcript_matches |
66 | 64 |
|
67 | 65 |
|
| 66 | +def _percent_similarity(a: str, b: str) -> float: |
| 67 | + """Compute a simple normalized similarity between two sequences. |
| 68 | +
|
| 69 | + Uses substring check (perfect local match) as a fast path; otherwise falls |
| 70 | + back to difflib's `SequenceMatcher` ratio which is robust for short strings |
| 71 | + and small edits. |
| 72 | +
|
| 73 | + :param a: query sequence (typically the provided target protein sequence) |
| 74 | + :param b: reference sequence (transcript protein sequence) |
| 75 | + :return: similarity in [0.0, 1.0] |
| 76 | + """ |
| 77 | + if not a or not b: |
| 78 | + return 0.0 |
| 79 | + if a == b: |
| 80 | + return 1.0 |
| 81 | + # If query is fully contained in reference, treat as perfect local match |
| 82 | + if a in b: |
| 83 | + return 1.0 |
| 84 | + # Otherwise, compute a normalized similarity |
| 85 | + return SequenceMatcher(None, a, b).ratio() |
| 86 | + |
| 87 | + |
| 88 | +def _choose_most_similar_transcript( |
| 89 | + protein_sequence: str, mane_transcripts: list[TranscriptDescription] |
| 90 | +) -> TranscriptDescription | None: |
| 91 | + """Choose the transcript whose protein reference is most similar to the |
| 92 | + provided sequence. |
| 93 | +
|
| 94 | + Selects the highest similarity; ties keep first encountered (stable). |
| 95 | + """ |
| 96 | + if not mane_transcripts: |
| 97 | + return None |
| 98 | + if len(mane_transcripts) == 1: |
| 99 | + return mane_transcripts[0] |
| 100 | + |
| 101 | + best: TranscriptDescription | None = None |
| 102 | + best_score = -1.0 |
| 103 | + for tx in mane_transcripts: |
| 104 | + ref_seq = get_sequence(tx.refseq_prot) |
| 105 | + score = _percent_similarity(protein_sequence, ref_seq) |
| 106 | + if score > best_score: |
| 107 | + best_score = score |
| 108 | + best = tx |
| 109 | + |
| 110 | + return best |
| 111 | + |
| 112 | + |
68 | 113 | def _choose_best_mane_transcript( |
69 | 114 | mane_transcripts: list[ManeDescription], |
70 | 115 | ) -> ManeDescription | None: |
@@ -143,46 +188,77 @@ async def _select_protein_reference( |
143 | 188 | :raise TxSelectError: if no matching MANE transcripts and unable to get UniProt ID/ |
144 | 189 | reference sequence |
145 | 190 | """ |
146 | | - matching_transcripts = await _get_compatible_transcripts(target_gene, align_result) |
| 191 | + matching_transcripts = await _get_compatible_transcripts(align_result) |
147 | 192 |
|
148 | | - if not matching_transcripts: |
149 | | - if not target_gene.target_uniprot_ref: |
150 | | - msg = f"Unable to find matching transcripts for target gene {target_gene.target_gene_name}" |
151 | | - raise TxSelectError(msg) |
152 | | - protein_sequence = get_uniprot_sequence(target_gene.target_uniprot_ref.id) |
153 | | - np_accession = target_gene.target_uniprot_ref.id |
154 | | - ref_sequence = get_uniprot_sequence(target_gene.target_uniprot_ref.id) |
155 | | - if not ref_sequence: |
156 | | - msg = f"Unable to grab reference sequence from uniprot.org for target gene {target_gene.target_gene_name}" |
157 | | - raise TxSelectError(msg) |
158 | | - nm_accession = None |
159 | | - tx_mode = None |
160 | | - else: |
161 | | - mane_transcripts = get_mane_transcripts(matching_transcripts) |
| 193 | + # Map HGNC symbols to their compatible transcripts |
| 194 | + hgnc_to_transcripts: dict[str, list[str]] = {} |
| 195 | + for tx, hgnc in matching_transcripts: |
| 196 | + hgnc_to_transcripts.setdefault(hgnc, []).append(tx) |
| 197 | + |
| 198 | + per_gene_best: list[ManeDescription | TranscriptDescription] = [] |
| 199 | + best_tx: ManeDescription | TranscriptDescription | None = None |
| 200 | + |
| 201 | + # Choose one best transcript per gene (based on MANE priority, falling back to longest) |
| 202 | + for _, transcripts in hgnc_to_transcripts.items(): |
| 203 | + if not transcripts: |
| 204 | + continue |
| 205 | + |
| 206 | + mane_transcripts = get_mane_transcripts(transcripts) |
162 | 207 | best_tx = _choose_best_mane_transcript(mane_transcripts) |
| 208 | + |
163 | 209 | if not best_tx: |
164 | | - best_tx = await _get_longest_compatible_transcript( |
165 | | - list(matching_transcripts) |
166 | | - ) |
167 | | - if not best_tx: |
168 | | - msg = f"Unable to find matching MANE transcripts for target gene {target_gene.target_gene_name}" |
| 210 | + best_tx = await _get_longest_compatible_transcript(transcripts) |
| 211 | + |
| 212 | + if best_tx: |
| 213 | + per_gene_best.append(best_tx) |
| 214 | + |
| 215 | + # If we found any per-gene best candidates, Step 2: choose the most similar among them and |
| 216 | + # select it. |
| 217 | + if per_gene_best: |
| 218 | + if not target_gene.target_sequence: |
| 219 | + msg = f"Unable to find target sequence for target gene {target_gene.target_gene_name}" |
169 | 220 | raise TxSelectError(msg) |
| 221 | + |
| 222 | + protein_sequence = _get_protein_sequence(target_gene.target_sequence) |
| 223 | + best_tx = _choose_most_similar_transcript(protein_sequence, per_gene_best) |
| 224 | + |
| 225 | + # As a fallback, pick the first candidate |
| 226 | + if not best_tx: |
| 227 | + best_tx = per_gene_best[0] |
| 228 | + |
170 | 229 | ref_sequence = get_sequence(best_tx.refseq_prot) |
171 | | - nm_accession = best_tx.refseq_nuc |
172 | | - np_accession = best_tx.refseq_prot |
173 | | - tx_mode = best_tx.transcript_priority |
| 230 | + is_full_match = ref_sequence.find(protein_sequence) != -1 |
| 231 | + start = ref_sequence.find(protein_sequence[:10]) |
| 232 | + |
| 233 | + return TxSelectResult( |
| 234 | + nm=best_tx.refseq_nuc, |
| 235 | + np=best_tx.refseq_prot, |
| 236 | + start=start, |
| 237 | + is_full_match=is_full_match, |
| 238 | + sequence=get_sequence(best_tx.refseq_prot), |
| 239 | + transcript_mode=best_tx.transcript_priority, |
| 240 | + ) |
174 | 241 |
|
175 | | - protein_sequence = _get_protein_sequence(target_gene.target_sequence) |
176 | | - is_full_match = ref_sequence.find(protein_sequence) != -1 |
177 | | - start = ref_sequence.find(protein_sequence[:10]) |
| 242 | + # If we didn't find any suitable transcript, attempt to use a provided UniProt reference |
| 243 | + if not target_gene.target_uniprot_ref: |
| 244 | + msg = f"Unable to find matching transcripts for target gene {target_gene.target_gene_name}" |
| 245 | + raise TxSelectError(msg) |
| 246 | + |
| 247 | + uniprot_sequence = get_uniprot_sequence(target_gene.target_uniprot_ref.id) |
| 248 | + if not uniprot_sequence: |
| 249 | + msg = f"Unable to grab reference sequence from uniprot.org for target gene {target_gene.target_gene_name}" |
| 250 | + raise TxSelectError(msg) |
| 251 | + |
| 252 | + is_full_match = uniprot_sequence.find(protein_sequence) != -1 |
| 253 | + start = uniprot_sequence.find(protein_sequence[:10]) |
178 | 254 |
|
179 | 255 | return TxSelectResult( |
180 | | - nm=nm_accession, |
181 | | - np=np_accession, |
| 256 | + nm=None, |
| 257 | + np=target_gene.target_uniprot_ref.id, |
182 | 258 | start=start, |
183 | 259 | is_full_match=is_full_match, |
184 | 260 | sequence=protein_sequence, |
185 | | - transcript_mode=tx_mode, |
| 261 | + transcript_mode=None, |
186 | 262 | ) |
187 | 263 |
|
188 | 264 |
|
|
0 commit comments