Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,7 @@ var/
venv.bak/
venv/
wheels/

# Dolt database files (added by bd init)
.dolt/
*.db
252 changes: 246 additions & 6 deletions src/hgvs/variantmapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@

# ############################################################################
# g⟷n
def g_to_n(self, var_g, tx_ac, alt_aln_method=hgvs.global_config.mapping.alt_aln_method):

Check failure on line 137 in src/hgvs/variantmapper.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 17 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=biocommons_hgvs&issues=AZ0qzPmFP8bj0kFn5Mm2&open=AZ0qzPmFP8bj0kFn5Mm2&pullRequest=823
"""Given a parsed g. variant, return a n. variant on the specified
transcript using the specified alignment method (default is
"splign" from NCBI).
Expand Down Expand Up @@ -176,12 +176,24 @@
pos_n.start.base += 1
pos_n.end.base -= 1
edit_n.ref = ""
elif self._variant_has_internal_gap(mapper, var_g):
edit_n, pos_n = self._get_altered_tx_sequence(
mapper.strand, mapper, var_g.posedit.pos, var_g, pos_n
)
elif self._variant_spans_i_segment(mapper, var_g):
expanded_pos_g = self._expand_pos_g_for_adjacent_gap(mapper, var_g)
edit_n, pos_n = self._get_altered_tx_sequence(
mapper.strand, mapper, expanded_pos_g, var_g, pos_n
)
else:
# variant at alignment gap
pos_g = mapper.n_to_g(pos_n)
edit_n = hgvs.edit.NARefAlt(
ref="", alt=self._get_altered_sequence(mapper.strand, pos_g, var_g)
)
if self._variant_spans_i_segment(mapper, var_g):
edit_n, pos_n = self._get_altered_tx_sequence(mapper.strand, mapper, pos_g, var_g, pos_n)
else:
edit_n = hgvs.edit.NARefAlt(
ref="", alt=self._get_altered_sequence(mapper.strand, pos_g, var_g)
)
pos_n.uncertain = var_g.posedit.pos.uncertain
var_n = hgvs.sequencevariant.SequenceVariant(
ac=tx_ac, type="n", posedit=hgvs.posedit.PosEdit(pos_n, edit_n)
Expand Down Expand Up @@ -281,12 +293,29 @@
pos_c.start.base += 1
pos_c.end.base -= 1
edit_c.ref = ""
elif self._variant_has_internal_gap(mapper, var_g):
# Gap segment lies inside the variant interval; both endpoints are in = regions
# so pos_c.uncertain is False, but the transcript length differs from genomic length.
# Recalculate the transcript edit from sequences to absorb the gap.
edit_c, pos_c = self._get_altered_tx_sequence(
mapper.strand, mapper, var_g.posedit.pos, var_g, pos_c
)
elif self._variant_spans_i_segment(mapper, var_g):
# I-segment is immediately adjacent to the variant interval (not internal, not uncertain).
# The alt allele may cancel with the adjacent I-segment yielding a simpler tx edit.
expanded_pos_g = self._expand_pos_g_for_adjacent_gap(mapper, var_g)
edit_c, pos_c = self._get_altered_tx_sequence(
mapper.strand, mapper, expanded_pos_g, var_g, pos_c
)
else:
# variant at alignment gap
pos_g = mapper.c_to_g(pos_c)
edit_c = hgvs.edit.NARefAlt(
ref="", alt=self._get_altered_sequence(mapper.strand, pos_g, var_g)
)
if self._variant_spans_i_segment(mapper, var_g):
edit_c, pos_c = self._get_altered_tx_sequence(mapper.strand, mapper, pos_g, var_g, pos_c)
else:
edit_c = hgvs.edit.NARefAlt(
ref="", alt=self._get_altered_sequence(mapper.strand, pos_g, var_g)
)
pos_c.uncertain = var_g.posedit.pos.uncertain
var_c = hgvs.sequencevariant.SequenceVariant(
ac=tx_ac, type="c", posedit=hgvs.posedit.PosEdit(pos_c, edit_c)
Expand Down Expand Up @@ -639,6 +668,217 @@
seq = reverse_complement(seq)
return seq

def _variant_spans_i_segment(self, mapper, var_g):
"""Return True if var_g straddles a normal-to-I-segment boundary in the CIGAR.

I-segments are genomic positions with no transcript equivalent. The fix is needed
only when the variant spans *across* an I-segment boundary (one endpoint in a matching
"=" region, the other landing in an I-segment). When the entire variant lies within an
I-segment, the existing uncertain-path logic handles it correctly.
"""
gc_offset = mapper.gc_offset
ref_pos = mapper.cigarmapper.ref_pos
cigar_op = mapper.cigarmapper.cigar_op
# Guard: partially uncertain variants may have Interval positions without .base
try:
start_offset = var_g.posedit.pos.start.base - 1 - gc_offset
end_offset = var_g.posedit.pos.end.base - gc_offset
except AttributeError:
return False

# Determine the CIGAR op at each endpoint
start_op = end_op = None
for i, op in enumerate(cigar_op):
if start_op is None and start_offset < ref_pos[i + 1]:
start_op = op
if end_op is None and end_offset < ref_pos[i + 1]:
end_op = op
if start_op is not None and end_op is not None:
break

# If either endpoint falls outside the alignment, we can't determine the op
if start_op is None or end_op is None:
return False

# Apply fix only when at least one endpoint is in "I" and at least one is not
ops = {start_op, end_op}
return "I" in ops and ops != {"I"}

def _expand_pos_g_for_adjacent_gap(self, mapper, var_g):
"""Return a copy of var_g.posedit.pos expanded to include any adjacent I-segment.

When the variant's interbase end sits at the start of an I-segment (or its
interbase start sits at the end of an I-segment), extend pos_g so that
_get_altered_tx_sequence sees the full I-segment in seq and i_offsets.
"""
gc_offset = mapper.gc_offset
ref_pos = mapper.cigarmapper.ref_pos
cigar_op = mapper.cigarmapper.cigar_op

pos_g = copy.deepcopy(var_g.posedit.pos)

# Check right side: interbase end = var_g.posedit.pos.end.base - gc_offset
end_offset = var_g.posedit.pos.end.base - gc_offset
for i, op in enumerate(cigar_op):
if op == "I" and ref_pos[i] == end_offset:
# I-segment starts exactly at the interbase end of the variant
pos_g.end.base = ref_pos[i + 1] + gc_offset
break

# Check left side: interbase start = var_g.posedit.pos.start.base - 1 - gc_offset
start_offset = var_g.posedit.pos.start.base - 1 - gc_offset
for i, op in enumerate(cigar_op):
if op == "I" and ref_pos[i + 1] == start_offset:
# I-segment ends exactly at the interbase start of the variant
pos_g.start.base = ref_pos[i] + gc_offset + 1
break

return pos_g

def _gap_segments_within_pos_g(self, mapper, pos_g):
"""Return gap segments (I and D CIGAR ops) that fall within genomic interval pos_g.

I-segments: genomic bases with no transcript equivalent.
D-segments: transcript-only positions at an interbase genomic location.

Returns:
dict with keys:
"I": set of 0-based offsets (relative to pos_g.start.base) for I-segment bases
"D": list of (interbase_offset, tgt_start, tgt_end) for each D-segment
where interbase_offset is 0-based relative to pos_g.start.base,
tgt_start/tgt_end are 0-based transcript positions of the D-segment bases
"""
i_offsets = set()
d_segments = []
cigar_ops = mapper.cigarmapper.cigar_op
ref_pos = mapper.cigarmapper.ref_pos
tgt_pos = mapper.cigarmapper.tgt_pos
gc_offset = mapper.gc_offset

for i, op in enumerate(cigar_ops):
if op == "I":
# I advances genome/ref only — absolute genomic positions (1-based, inclusive)
abs_start = ref_pos[i] + gc_offset + 1
abs_end = ref_pos[i + 1] + gc_offset # inclusive
overlap_start = max(abs_start, pos_g.start.base)
overlap_end = min(abs_end, pos_g.end.base)
for p in range(overlap_start, overlap_end + 1):
i_offsets.add(p - pos_g.start.base)
elif op == "D":
# D advances transcript/tgt only — genomic interbase position is ref_pos[i] (same as ref_pos[i+1])
# The D-segment sits at interbase position gc_offset + ref_pos[i]
# It is "inside" pos_g if its genomic interbase position falls strictly between start and end
genomic_interbase = ref_pos[i] + gc_offset # 0-based interbase genomic position
# pos_g uses 1-based inclusive; interbase is between pos_g.start.base-1 and pos_g.start.base
# A D-segment is inside pos_g if: pos_g.start.base <= genomic_interbase+1 <= pos_g.end.base
# i.e., it sits between two bases both within pos_g
if pos_g.start.base <= genomic_interbase < pos_g.end.base:
interbase_offset = genomic_interbase - (pos_g.start.base - 1) # offset relative to pos_g start
d_segments.append((interbase_offset, tgt_pos[i], tgt_pos[i + 1]))

return {"I": i_offsets, "D": d_segments}

def _variant_has_internal_gap(self, mapper, var_g):
"""Return True if any I or D CIGAR segment falls strictly inside var_g's interval.

This is distinct from _variant_spans_i_segment which checks endpoint CIGAR ops.
Internal gaps occur when both endpoints are in normal (=) segments but a gap
segment lies between them.
"""
gaps = self._gap_segments_within_pos_g(mapper, var_g.posedit.pos)
return bool(gaps["I"] or gaps["D"])

def _get_altered_tx_sequence(self, strand, mapper, pos_g, var_g, tx_pos):

Check failure on line 791 in src/hgvs/variantmapper.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 24 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=biocommons_hgvs&issues=AZ0qzPmFP8bj0kFn5Mm3&open=AZ0qzPmFP8bj0kFn5Mm3&pullRequest=823
"""Compute transcript-level edit for variants at alignment gaps (uncertain path).

Correctly handles I-segment positions (genomic bases with no transcript equivalent)
by filtering them out when building the transcript reference and alt sequences.

Returns (NARefAlt, adjusted_tx_pos).
"""
# Fetch genomic reference for pos_g
seq = list(self.hdp.get_seq(var_g.ac, pos_g.start.base - 1, pos_g.end.base))

# Find gap segment positions (0-based offsets into seq)
gaps = self._gap_segments_within_pos_g(mapper, pos_g)
i_offsets = gaps["I"]
# NOTE: gaps["D"] (transcript-only bases with no genomic counterpart) are detected but
# not yet used for sequence reconstruction. Variants spanning D-segments are routed here
# via _variant_has_internal_gap, and the deletion/delins mapping happens to be correct
# because D-segment bases are not present in the fetched genomic seq. Full D-segment
# splicing (inserting transcript-only bases into tx_ref_str) is a future enhancement.

# Variant boundaries within seq (0-based)
var_start = var_g.posedit.pos.start.base - pos_g.start.base
var_end = var_g.posedit.pos.end.base - pos_g.start.base + 1
edit = var_g.posedit.edit

# Build transcript ref = all genomic bases excluding I-segment positions
tx_ref_str = "".join(seq[j] for j in range(len(seq)) if j not in i_offsets)

# Build transcript alt by applying variant to prefix/suffix (filtered of I-bases)
if edit.type == "ins":
# Insertion between two positions: prefix includes var_start base
prefix_tx = [seq[j] for j in range(var_start + 1) if j not in i_offsets]
suffix_tx = [seq[j] for j in range(var_start + 1, len(seq)) if j not in i_offsets]
tx_alt_str = "".join(prefix_tx) + (edit.alt or "") + "".join(suffix_tx)
elif edit.type == "sub":
prefix_tx = [seq[j] for j in range(var_start) if j not in i_offsets]
suffix_tx = [seq[j] for j in range(var_start + 1, len(seq)) if j not in i_offsets]
tx_alt_str = "".join(prefix_tx) + (edit.alt or "") + "".join(suffix_tx)
elif edit.type == "del":
prefix_tx = [seq[j] for j in range(var_start) if j not in i_offsets]
suffix_tx = [seq[j] for j in range(var_end, len(seq)) if j not in i_offsets]
tx_alt_str = "".join(prefix_tx) + "".join(suffix_tx)
elif edit.type in ("delins", "dup", "inv", "identity"):
prefix_tx = [seq[j] for j in range(var_start) if j not in i_offsets]
if edit.type == "delins":
# For delins, include the adjacent I-seg base at exactly var_end in the suffix.
# This base is in the genomic reference but absent from the transcript; including
# it allows the prefix/suffix trimming step to cancel the double gap and produce
# the correct minimal transcript edit (e.g. 6-base delins → single SNV).
suffix_tx = [seq[j] for j in range(var_end, len(seq)) if j not in i_offsets or j == var_end]
ins_seq = edit.alt or ""
else:
# For dup/inv/identity the duplicated/inverted/copied sequence must be
# transcript-only bases — exclude any I-segment positions from the variant range.
suffix_tx = [seq[j] for j in range(var_end, len(seq)) if j not in i_offsets]
clean_variant_seq = "".join(seq[j] for j in range(var_start, var_end) if j not in i_offsets)
if edit.type == "dup":
ins_seq = clean_variant_seq * 2
elif edit.type == "inv":
ins_seq = reverse_complement(clean_variant_seq)
else: # identity
ins_seq = clean_variant_seq
tx_alt_str = "".join(prefix_tx) + ins_seq + "".join(suffix_tx)
else:
msg = f"_get_altered_tx_sequence: unsupported edit type {edit.type!r}"
raise HGVSUnsupportedOperationError(msg)

# Apply strand: convert from genomic order to transcript order
if strand == -1:
tx_ref_str = reverse_complement(tx_ref_str)
tx_alt_str = reverse_complement(tx_alt_str)

# Trim common prefix and suffix to find the minimal edit
n_prefix = 0
while n_prefix < len(tx_ref_str) and n_prefix < len(tx_alt_str) and tx_ref_str[n_prefix] == tx_alt_str[n_prefix]:
n_prefix += 1

n_suffix = 0
max_suffix = min(len(tx_ref_str) - n_prefix, len(tx_alt_str) - n_prefix)
while n_suffix < max_suffix and tx_ref_str[-(n_suffix + 1)] == tx_alt_str[-(n_suffix + 1)]:
n_suffix += 1

ref_trimmed = tx_ref_str[n_prefix: len(tx_ref_str) - n_suffix if n_suffix else None]
alt_trimmed = tx_alt_str[n_prefix: len(tx_alt_str) - n_suffix if n_suffix else None]

adjusted_pos = copy.deepcopy(tx_pos)
adjusted_pos.start.base += n_prefix
adjusted_pos.end.base -= n_suffix

return hgvs.edit.NARefAlt(ref=ref_trimmed, alt=alt_trimmed), adjusted_pos

def _update_gene_symbol(self, var, symbol):
if not symbol:
symbol = self.hdp.get_tx_identity_info(var.ac).get("hgnc", None)
Expand Down
Binary file modified tests/data/cache-py3.hdp
Binary file not shown.
48 changes: 48 additions & 0 deletions tests/test_hgvs_assemblymapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,54 @@ def test_projection_at_alignment_discrepancy(self):

self.assertEqual(str(var_g), hgvs_g)

# g_to_c next to 3' UTR
hgvs_g = "NC_000002.11:g.182402910_182402911insTTACT"
hgvs_c = "NM_001030311.2:c.1677_*1insAGTAA"
var_g = self.hp.parse_hgvs_variant(hgvs_g)
var_c = self.am37.g_to_c(var_g, "NM_001030311.2")
assert str(var_c) == hgvs_c

# g_to_c for a variant adjacent to a CIGAR I-segment (genome has extra base at
# position 119027727 with no transcript equivalent). The variant covers only "="
# territory (119027721-119027726), so pos_c.uncertain=False and the strand-flip
# path applies directly. The 6-base delins on the minus-strand gene maps to a
# 6-base delins on the transcript after reverse-complementing the alt.
hgvs_g = "NC_000011.10:g.119027721_119027726delinsTCACA"
hgvs_c = "NM_001164277.1:c.532G>A"

var_g = self.hp.parse_hgvs_variant(hgvs_g)
var_c = self.am.g_to_c(var_g, "NM_001164277.1")

assert str(var_g) == hgvs_g
assert str(var_c) == hgvs_c

# g_to_c for a variant with a CIGAR I-segment strictly inside the interval.
# g.119027726 maps to c.527 (=), g.119027727 is the I-segment (no transcript
# equivalent), g.119027728 maps to c.526 (=). Both endpoints are in = segments
# so pos_c.uncertain=False, but the genomic span (3 bases) is wider than the
# transcript span (2 bases). The fix routes this through sequence reconstruction
# so the I-segment base is excluded from the transcript edit.
hgvs_g2 = "NC_000011.10:g.119027726_119027728delinsTT"
hgvs_c2 = "NM_001164277.1:c.526_527delinsAA"

var_g2 = self.hp.parse_hgvs_variant(hgvs_g2)
var_c2 = self.am.g_to_c(var_g2, "NM_001164277.1")

assert str(var_g2) == hgvs_g2
assert str(var_c2) == hgvs_c2

# deletion spanning a 'D' segment in the CIGAR alignment
# NM_015120.4 exon 1 CIGAR: 146=3D289= (transcript has 3 extra bases not in genome)
# g.73385901_73385903del spans across the 3D boundary → maps to c.34_36del
hgvs_g = "NC_000002.12:g.73385901_73385903del"
hgvs_c = "NM_015120.4:c.34_36del"

var_g = self.hp.parse_hgvs_variant(hgvs_g)
var_c = self.am.g_to_c(var_g, "NM_015120.4")

assert str(var_c) == hgvs_c


def test_c_to_p_with_stop_gain(self):
# issue-474
hgvs_c = "NM_080877.2:c.1733_1735delinsTTT"
Expand Down