diff --git a/src/hgvs/alignmentmapper.py b/src/hgvs/alignmentmapper.py index 894d3f58..2786f3bf 100644 --- a/src/hgvs/alignmentmapper.py +++ b/src/hgvs/alignmentmapper.py @@ -26,7 +26,6 @@ # g. ... 123 124 125 126 127 128 129 130 131 132 133 ... # - from bioutils.coordinates import strand_int_to_pm import hgvs.location @@ -41,15 +40,18 @@ from hgvs.utils import build_tx_cigar from hgvs.utils.cigarmapper import CIGARMapper +from hgvs.dataproviders.interface import Interface +from hgvs.location import BaseOffsetInterval, Interval + -def _zbc_to_hgvs(i): +def _zbc_to_hgvs(i: int) -> int: """Convert zero-based coordinate to hgvs (1 based, missing zero)""" if i >= 0: i += 1 return i -def _hgvs_to_zbc(i): +def _hgvs_to_zbc(i: int) -> int: """Convert hgvs (1 based, missing zero)""" if i >= 1: i -= 1 @@ -82,7 +84,9 @@ class AlignmentMapper: "cigar_op", ) - def __init__(self, hdp, tx_ac, alt_ac, alt_aln_method): + def __init__( + self, hdp: Interface, tx_ac: str, alt_ac: str, alt_aln_method: str + ) -> None: self.tx_ac = tx_ac self.alt_ac = alt_ac self.alt_aln_method = alt_aln_method @@ -108,11 +112,16 @@ def __init__(self, hdp, tx_ac, alt_ac, alt_aln_method): # is that exons are adjacent. Assert that here. sorted_tx_exons = sorted(tx_exons, key=lambda e: e["ord"]) for i in range(1, len(sorted_tx_exons)): - if sorted_tx_exons[i - 1]["tx_end_i"] != sorted_tx_exons[i]["tx_start_i"]: + if ( + sorted_tx_exons[i - 1]["tx_end_i"] + != sorted_tx_exons[i]["tx_start_i"] + ): raise HGVSDataNotAvailableError( "AlignmentMapper(tx_ac={self.tx_ac}, " "alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): " - "Exons {a} and {b} are not adjacent".format(self=self, a=i, b=i + 1) + "Exons {a} and {b} are not adjacent".format( + self=self, a=i, b=i + 1 + ) ) self.strand = tx_exons[0]["alt_strand"] @@ -138,11 +147,11 @@ def __init__(self, hdp, tx_ac, alt_ac, alt_aln_method): self.tgt_len = sum(tx_identity_info["lengths"]) self.cigarmapper = None - assert not ( - (self.cds_start_i is None) ^ (self.cds_end_i is None) - ), "CDS start and end must both be defined or neither defined" + assert not ((self.cds_start_i is None) ^ (self.cds_end_i is None)), ( + "CDS start and end must both be defined or neither defined" + ) - def __str__(self): + def __str__(self) -> str: return ( "{self.__class__.__name__}: {self.tx_ac} ~ {self.alt_ac} ~ {self.alt_aln_method}; " "{strand_pm} strand; offset={self.gc_offset}".format( @@ -150,7 +159,9 @@ def __str__(self): ) ) - def g_to_n(self, g_interval, strict_bounds=None): + def g_to_n( + self, g_interval: Interval, strict_bounds: bool | None = None + ) -> BaseOffsetInterval: """convert a genomic (g.) interval to a transcript cDNA (n.) interval""" if strict_bounds is None: @@ -183,7 +194,9 @@ def g_to_n(self, g_interval, strict_bounds=None): uncertain=frs_cigar in "DI" or fre_cigar in "DI", ) - def n_to_g(self, n_interval, strict_bounds=None): + def n_to_g( + self, n_interval: BaseOffsetInterval, strict_bounds: bool | None = None + ) -> Interval: """convert a transcript (n.) interval to a genomic (g.) interval""" if strict_bounds is None: @@ -210,12 +223,16 @@ def n_to_g(self, n_interval, strict_bounds=None): # The returned interval would be uncertain when locating at alignment gaps return hgvs.location.Interval( - start=hgvs.location.SimplePosition(gs, uncertain=n_interval.start.uncertain), + start=hgvs.location.SimplePosition( + gs, uncertain=n_interval.start.uncertain + ), end=hgvs.location.SimplePosition(ge, uncertain=n_interval.end.uncertain), uncertain=grs_cigar in "DI" or gre_cigar in "DI", ) - def n_to_c(self, n_interval, strict_bounds=None): + def n_to_c( + self, n_interval: BaseOffsetInterval, strict_bounds: bool | None = None + ) -> BaseOffsetInterval: """convert a transcript cDNA (n.) interval to a transcript CDS (c.) interval""" if strict_bounds is None: @@ -230,7 +247,9 @@ def n_to_c(self, n_interval, strict_bounds=None): ) ) - if strict_bounds and (n_interval.start.base <= 0 or n_interval.end.base > self.tgt_len): + if strict_bounds and ( + n_interval.start.base <= 0 or n_interval.end.base > self.tgt_len + ): raise HGVSInvalidIntervalError( "The given coordinate is outside the bounds of the reference sequence." ) @@ -245,7 +264,9 @@ def pos_n_to_c(pos): else: c = pos.base - self.cds_end_i c_datum = Datum.CDS_END - return hgvs.location.BaseOffsetPosition(base=c, offset=pos.offset, datum=c_datum) + return hgvs.location.BaseOffsetPosition( + base=c, offset=pos.offset, datum=c_datum + ) c_interval = hgvs.location.BaseOffsetInterval( start=pos_n_to_c(n_interval.start), @@ -254,7 +275,9 @@ def pos_n_to_c(pos): ) return c_interval - def c_to_n(self, c_interval, strict_bounds=None): + def c_to_n( + self, c_interval: BaseOffsetInterval, strict_bounds: bool | None = None + ) -> BaseOffsetInterval: """convert a transcript CDS (c.) interval to a transcript cDNA (n.) interval""" if strict_bounds is None: @@ -278,7 +301,9 @@ def pos_c_to_n(pos): n -= 1 if n <= 0 or n > self.tgt_len: if strict_bounds: - raise HGVSInvalidIntervalError(f"c.{pos} coordinate is out of bounds") + raise HGVSInvalidIntervalError( + f"c.{pos} coordinate is out of bounds" + ) return hgvs.location.BaseOffsetPosition( base=n, offset=pos.offset, datum=Datum.SEQ_START ) @@ -290,16 +315,20 @@ def pos_c_to_n(pos): ) return n_interval - def g_to_c(self, g_interval, strict_bounds=None): + def g_to_c( + self, g_interval: Interval, strict_bounds: bool | None = None + ) -> BaseOffsetInterval: """convert a genomic (g.) interval to a transcript CDS (c.) interval""" return self.n_to_c(self.g_to_n(g_interval), strict_bounds=strict_bounds) - def c_to_g(self, c_interval, strict_bounds=None): + def c_to_g( + self, c_interval: BaseOffsetInterval, strict_bounds: bool | None = None + ) -> Interval: """convert a transcript CDS (c.) interval to a genomic (g.) interval""" return self.n_to_g(self.c_to_n(c_interval), strict_bounds=strict_bounds) @property - def is_coding_transcript(self): + def is_coding_transcript(self) -> bool: if (self.cds_start_i is not None) ^ (self.cds_end_i is not None): raise HGVSError( "{self.tx_ac}: CDS start_i and end_i" @@ -307,7 +336,7 @@ def is_coding_transcript(self): ) return self.cds_start_i is not None - def g_interval_is_inbounds(self, ival): + def g_interval_is_inbounds(self, ival: Interval) -> bool: grs = ival.start.base - 1 - self.gc_offset gre = ival.end.base - 1 - self.gc_offset return grs >= 0 and gre <= self.cigarmapper.ref_len diff --git a/src/hgvs/assemblymapper.py b/src/hgvs/assemblymapper.py index d30a81db..17ce9990 100644 --- a/src/hgvs/assemblymapper.py +++ b/src/hgvs/assemblymapper.py @@ -5,6 +5,7 @@ from bioutils.sequences import TranslationTable import hgvs +from hgvs.alignmentmapper import AlignmentMapper import hgvs.normalizer from hgvs.exceptions import ( HGVSDataNotAvailableError, @@ -13,6 +14,7 @@ HGVSUnsupportedOperationError, ) from hgvs.variantmapper import VariantMapper +from hgvs.sequencevariant import SequenceVariant _logger = logging.getLogger(__name__) @@ -50,7 +52,7 @@ class AssemblyMapper(VariantMapper): def __init__( self, - hdp, + hdp: hgvs.dataproviders.interface.Interface, assembly_name=hgvs.global_config.mapping.assembly, alt_aln_method=hgvs.global_config.mapping.alt_aln_method, normalize=hgvs.global_config.mapping.normalize, @@ -60,7 +62,7 @@ def __init__( add_gene_symbol=hgvs.global_config.mapping.add_gene_symbol, *args, **kwargs, - ): + ) -> None: """ :param object hdp: instance of hgvs.dataprovider subclass :param bool replace_reference: replace reference (entails additional network access) @@ -88,44 +90,54 @@ def __init__( self.in_par_assume = in_par_assume self._norm = None if self.normalize: - vm = VariantMapper(hdp=hdp, replace_reference=replace_reference, - prevalidation_level=prevalidation_level, - add_gene_symbol=add_gene_symbol) + vm = VariantMapper( + hdp=hdp, + replace_reference=replace_reference, + prevalidation_level=prevalidation_level, + add_gene_symbol=add_gene_symbol, + ) self._norm = hgvs.normalizer.Normalizer( - hdp, alt_aln_method=alt_aln_method, validate=False, variantmapper=vm, + hdp, + alt_aln_method=alt_aln_method, + validate=False, + variantmapper=vm, ) self._assembly_map = { - k: v for k, v in hdp.get_assembly_map(self.assembly_name).items() if k.startswith("NC_") + k: v + for k, v in hdp.get_assembly_map(self.assembly_name).items() + if k.startswith("NC_") } self._assembly_accessions = set(self._assembly_map.keys()) - def __repr__(self): + def __repr__(self) -> str: return ( "{self.__module__}.{t.__name__}(alt_aln_method={self.alt_aln_method}, " "assembly_name={self.assembly_name}, normalize={self.normalize}, " "prevalidation_level={self.prevalidation_level}, " - "replace_reference={self.replace_reference})".format(self=self, t=type(self)) + "replace_reference={self.replace_reference})".format( + self=self, t=type(self) + ) ) - def g_to_c(self, var_g, tx_ac): + def g_to_c(self, var_g: SequenceVariant, tx_ac: str) -> SequenceVariant: var_out = super(AssemblyMapper, self).g_to_c( var_g, tx_ac, alt_aln_method=self.alt_aln_method ) return self._maybe_normalize(var_out) - def g_to_n(self, var_g, tx_ac): + def g_to_n(self, var_g: SequenceVariant, tx_ac: str) -> SequenceVariant: var_out = super(AssemblyMapper, self).g_to_n( var_g, tx_ac, alt_aln_method=self.alt_aln_method ) return self._maybe_normalize(var_out) - def g_to_t(self, var_g, tx_ac): + def g_to_t(self, var_g: SequenceVariant, tx_ac: str) -> SequenceVariant: var_out = super(AssemblyMapper, self).g_to_t( var_g, tx_ac, alt_aln_method=self.alt_aln_method ) return self._maybe_normalize(var_out) - def c_to_g(self, var_c): + def c_to_g(self, var_c: SequenceVariant) -> SequenceVariant: alt_ac = self._alt_ac_for_tx_ac(var_c.ac) var_out = super(AssemblyMapper, self).c_to_g( var_c, alt_ac, alt_aln_method=self.alt_aln_method @@ -146,7 +158,7 @@ def t_to_g(self, var_t): ) return self._maybe_normalize(var_out) - def t_to_p(self, var_t): + def t_to_p(self, var_t: SequenceVariant) -> SequenceVariant: """Return a protein variant, or "non-coding" for non-coding variant types CAUTION: Unlike other x_to_y methods that always return @@ -165,36 +177,44 @@ def t_to_p(self, var_t): "Expected a coding (c.) or non-coding (n.) variant; got " + str(var_t) ) - def c_to_n(self, var_c): + def c_to_n(self, var_c: SequenceVariant) -> SequenceVariant: alt_ac = self._alt_ac_for_tx_ac(var_c.ac) var_out = super(AssemblyMapper, self).c_to_n( var_c, alt_ac=alt_ac, alt_aln_method=self.alt_aln_method ) return self._maybe_normalize(var_out) - def n_to_c(self, var_n): + def n_to_c(self, var_n: SequenceVariant) -> SequenceVariant: alt_ac = self._alt_ac_for_tx_ac(var_n.ac) var_out = super(AssemblyMapper, self).n_to_c( var_n, alt_ac=alt_ac, alt_aln_method=self.alt_aln_method ) return self._maybe_normalize(var_out) - def c_to_p(self, var_c, translation_table=TranslationTable.standard): + def c_to_p( + self, var_c: SequenceVariant, translation_table=TranslationTable.standard + ) -> SequenceVariant: alt_ac = self._alt_ac_for_tx_ac(var_c.ac) var_out = super(AssemblyMapper, self).c_to_p( - var_c, alt_ac=alt_ac, alt_aln_method=self.alt_aln_method, translation_table=translation_table + var_c, + alt_ac=alt_ac, + alt_aln_method=self.alt_aln_method, + translation_table=translation_table, ) return self._maybe_normalize(var_out) - def relevant_transcripts(self, var_g): + def relevant_transcripts(self, var_g: SequenceVariant) -> SequenceVariant: """return list of transcripts accessions (strings) for given variant, selected by genomic overlap""" tx = self.hdp.get_tx_for_region( - var_g.ac, self.alt_aln_method, var_g.posedit.pos.start.base, var_g.posedit.pos.end.base + var_g.ac, + self.alt_aln_method, + var_g.posedit.pos.start.base, + var_g.posedit.pos.end.base, ) return [e["tx_ac"] for e in tx] - def _alt_ac_for_tx_ac(self, tx_ac): + def _alt_ac_for_tx_ac(self, tx_ac: str) -> str: """return chromosomal accession for given transcript accession (and the_assembly and aln_method setting used to instantiate this AssemblyMapper) @@ -219,12 +239,18 @@ def _alt_ac_for_tx_ac(self, tx_ac): names = set(self._assembly_map[ac] for ac in alt_acs) if names != set("XY"): alts = ", ".join( - ["{ac} ({n})".format(ac=ac, n=self._assembly_map[ac]) for ac in alt_acs] + [ + "{ac} ({n})".format(ac=ac, n=self._assembly_map[ac]) + for ac in alt_acs + ] ) raise HGVSError( "Multiple chromosomal alignments for {tx_ac} in {an}" " using {am} (non-pseudoautosomal region) [{alts}]".format( - tx_ac=tx_ac, an=self.assembly_name, am=self.alt_aln_method, alts=alts + tx_ac=tx_ac, + an=self.assembly_name, + am=self.alt_aln_method, + alts=alts, ) ) @@ -237,7 +263,9 @@ def _alt_ac_for_tx_ac(self, tx_ac): ) ) - alt_acs = [ac for ac in alt_acs if self._assembly_map[ac] == self.in_par_assume] + alt_acs = [ + ac for ac in alt_acs if self._assembly_map[ac] == self.in_par_assume + ] if len(alt_acs) != 1: raise HGVSError( "Multiple chromosomal alignments for {tx_ac} in {an}" @@ -253,7 +281,12 @@ def _alt_ac_for_tx_ac(self, tx_ac): assert len(alt_acs) == 1, "Should have exactly one alignment at this point" return alt_acs[0] - def _fetch_AlignmentMapper(self, tx_ac, alt_ac=None, alt_aln_method=None): + def _fetch_AlignmentMapper( + self, + tx_ac: str, + alt_ac: str | None = None, + alt_aln_method: str | None = None, + ) -> AlignmentMapper: """convenience version of VariantMapper._fetch_AlignmentMapper that derives alt_ac from transcript, assembly, and alt_aln_method used to instantiate the AssemblyMapper instance @@ -264,9 +297,11 @@ def _fetch_AlignmentMapper(self, tx_ac, alt_ac=None, alt_aln_method=None): alt_ac = self._alt_ac_for_tx_ac(tx_ac) if alt_aln_method is None: alt_aln_method = self.alt_aln_method - return super(AssemblyMapper, self)._fetch_AlignmentMapper(tx_ac, alt_ac, alt_aln_method) + return super(AssemblyMapper, self)._fetch_AlignmentMapper( + tx_ac, alt_ac, alt_aln_method + ) - def _maybe_normalize(self, var): + def _maybe_normalize(self, var: SequenceVariant) -> SequenceVariant: """normalize variant if requested, and ignore HGVSUnsupportedOperationError This is better than checking whether the variant is intronic because future UTAs will support LRG, which will enable checking intronic variants. diff --git a/src/hgvs/edit.py b/src/hgvs/edit.py index 8f4799c7..f005690c 100644 --- a/src/hgvs/edit.py +++ b/src/hgvs/edit.py @@ -8,6 +8,7 @@ location). """ + import abc import attr @@ -15,20 +16,22 @@ import hgvs from hgvs.exceptions import HGVSError, HGVSUnsupportedOperationError +from hgvs.config import Config +from hgvs.location import Interval @attr.s(slots=True) class Edit(abc.ABC): - def format(self, conf=None): + def format(self, conf: Config | None = None) -> str: return str(self) - def _format_config_na(self, conf=None): + def _format_config_na(self, conf: Config | None = None) -> int: max_ref_length = hgvs.global_config.formatting.max_ref_length if conf and "max_ref_length" in conf: max_ref_length = conf["max_ref_length"] return max_ref_length - def _format_config_aa(self, conf=None): + def _format_config_aa(self, conf: Config | None = None) -> tuple[bool, bool, bool]: p_3_letter = hgvs.global_config.formatting.p_3_letter p_term_asterisk = hgvs.global_config.formatting.p_term_asterisk p_init_met = hgvs.global_config.formatting.p_init_met @@ -41,15 +44,15 @@ def _format_config_aa(self, conf=None): p_init_met = conf["p_init_met"] return p_3_letter, p_term_asterisk, p_init_met - def _del_ins_lengths(self, ilen): + def _del_ins_lengths(self, ilen: int): raise HGVSUnsupportedOperationError( "internal function _del_ins_lengths not implemented for this variant type" ) @property @abc.abstractmethod - def type(self): - """ return the type of this Edit """ + def type(self) -> str: + """return the type of this Edit""" pass @@ -63,12 +66,12 @@ class NARefAlt(Edit): :ivar uncertain: boolean indicating whether the variant is uncertain/undetermined """ - ref = attr.ib(default=None) - alt = attr.ib(default=None) - uncertain = attr.ib(default=False) + ref: str | int | None = attr.ib(default=None) + alt: str | None = attr.ib(default=None) + uncertain: bool = attr.ib(default=False) @property - def ref_s(self): + def ref_s(self) -> str: """ returns a string representing the ref sequence, if it is not None and smells like a sequence @@ -85,7 +88,7 @@ def ref_s(self): ) @property - def ref_n(self): + def ref_n(self) -> str | None: """ returns an integer, either from the `ref` instance variable if it's a number, or the length of ref if it's a string, or None otherwise @@ -103,7 +106,7 @@ def ref_n(self): except ValueError: return len(self.ref) if self.ref else None - def format(self, conf=None): + def format(self, conf: Config | None = None) -> str: if self.ref is None and self.alt is None: raise HGVSError("RefAlt: ref and alt sequences are both undefined") @@ -138,7 +141,7 @@ def format(self, conf=None): __str__ = format - def _set_uncertain(self): + def _set_uncertain(self) -> "NARefAlt": """sets the uncertain flag to True; used primarily by the HGVS grammar :returns: self @@ -147,7 +150,7 @@ def _set_uncertain(self): return self @property - def type(self): + def type(self) -> str: """return the type of this Edit :returns: edit type (str) @@ -165,7 +168,7 @@ def type(self): edit_type = "ins" return edit_type - def _del_ins_lengths(self, ilen): + def _del_ins_lengths(self, ilen: int) -> tuple[int, int]: """returns (del_len, ins_len). Unspecified ref or alt returns None for del_len or ins_len respectively. """ @@ -179,16 +182,16 @@ def _del_ins_lengths(self, ilen): @attr.s(slots=True) class AARefAlt(Edit): - ref = attr.ib(default=None) - alt = attr.ib(default=None) - uncertain = attr.ib(default=False) - init_met = attr.ib(default=False) + ref: str | None = attr.ib(default=None) + alt: str | None = attr.ib(default=None) + uncertain: bool = attr.ib(default=False) + init_met: bool = attr.ib(default=False) - def __attrs_post_init__(self): + def __attrs_post_init__(self) -> None: self.ref = aa_to_aa1(self.ref) self.alt = aa_to_aa1(self.alt) - def format(self, conf=None): + def format(self, conf: Config | None = None) -> str: if self.ref is None and self.alt is None: # raise HGVSError("RefAlt: ref and alt sequences are both undefined") return "=" @@ -243,7 +246,7 @@ def format(self, conf=None): __str__ = format - def _set_uncertain(self): + def _set_uncertain(self) -> "AARefAlt": """sets the uncertain flag to True; used primarily by the HGVS grammar :returns: self @@ -252,7 +255,7 @@ def _set_uncertain(self): return self @property - def type(self): + def type(self) -> str: """return the type of this Edit :returns: edit type (str) @@ -270,7 +273,7 @@ def type(self): edit_type = "ins" return edit_type - def _del_ins_lengths(self, ilen): + def _del_ins_lengths(self, ilen: int) -> tuple[int, int]: """returns (del_len, ins_len). Unspecified ref or alt returns None for del_len or ins_len respectively. """ @@ -284,7 +287,7 @@ def _del_ins_lengths(self, ilen): @attr.s(slots=True) class AASub(AARefAlt): - def format(self, conf=None): + def format(self, conf: Config | None = None) -> str: p_3_letter, p_term_asterisk, p_init_met = self._format_config_aa(conf) if p_3_letter: @@ -298,7 +301,7 @@ def format(self, conf=None): __str__ = format @property - def type(self): + def type(self) -> str: """return the type of this Edit :returns: edit type (str) @@ -308,31 +311,35 @@ def type(self): @attr.s(slots=True) class AAFs(Edit): - ref = attr.ib(default=None) - alt = attr.ib(default=None) + ref: str | None = attr.ib(default=None) + alt: str | None = attr.ib(default=None) length = attr.ib(default=None) uncertain = attr.ib(default=False) - def __attrs_post_init__(self): + def __attrs_post_init__(self) -> None: self.ref = aa_to_aa1(self.ref) self.alt = aa_to_aa1(self.alt) - def format(self, conf=None): + def format(self, conf: Config | None = None) -> str: p_3_letter, p_term_asterisk, p_init_met = self._format_config_aa(conf) st_length = self.length or "" if p_3_letter: if p_term_asterisk: - s = "{alt}fs*{length}".format(alt=aa1_to_aa3(self.alt), length=st_length) + s = "{alt}fs*{length}".format( + alt=aa1_to_aa3(self.alt), length=st_length + ) else: - s = "{alt}fsTer{length}".format(alt=aa1_to_aa3(self.alt), length=st_length) + s = "{alt}fsTer{length}".format( + alt=aa1_to_aa3(self.alt), length=st_length + ) else: s = "{alt}fs*{length}".format(alt=self.alt, length=st_length) return "(" + s + ")" if self.uncertain else s __str__ = format - def _set_uncertain(self): + def _set_uncertain(self) -> "AAFs": """sets the uncertain flag to True; used primarily by the HGVS grammar :returns: self @@ -341,7 +348,7 @@ def _set_uncertain(self): return self @property - def type(self): + def type(self) -> str: """return the type of this Edit :returns: edit type (str) @@ -351,18 +358,18 @@ def type(self): @attr.s(slots=True) class AAExt(Edit): - ref = attr.ib(default=None) - alt = attr.ib(default=None) - aaterm = attr.ib(default=None) - length = attr.ib(default=None) + ref: str | None = attr.ib(default=None) + alt: str | None = attr.ib(default=None) + aaterm: str | None = attr.ib(default=None) + length: int | None = attr.ib(default=None) uncertain = attr.ib(default=False) - def __attrs_post_init__(self): + def __attrs_post_init__(self) -> None: self.ref = aa_to_aa1(self.ref) self.alt = aa_to_aa1(self.alt) self.aaterm = aa_to_aa1(self.aaterm) - def format(self, conf=None): + def format(self, conf: Config | None = None) -> str: p_3_letter, p_term_asterisk, p_init_met = self._format_config_aa(conf) st_alt = self.alt or "" @@ -376,12 +383,14 @@ def format(self, conf=None): if p_term_asterisk and st_aaterm == "Ter": st_aaterm = "*" - s = "{alt}ext{term}{length}".format(alt=st_alt, term=st_aaterm, length=st_length) + s = "{alt}ext{term}{length}".format( + alt=st_alt, term=st_aaterm, length=st_length + ) return "(" + s + ")" if self.uncertain else s __str__ = format - def _set_uncertain(self): + def _set_uncertain(self) -> "AAExt": """sets the uncertain flag to True; used primarily by the HGVS grammar :returns: self @@ -390,14 +399,14 @@ def _set_uncertain(self): return self @property - def type(self): + def type(self) -> str: """return the type of this Edit :returns: edit type (str) """ return "ext" - def _del_ins_lengths(self, ilen): + def _del_ins_lengths(self, ilen: int) -> tuple[int, int]: """returns (del_len, ins_len). Unspecified ref or alt returns None for del_len or ins_len respectively. """ @@ -406,10 +415,10 @@ def _del_ins_lengths(self, ilen): @attr.s(slots=True) class Dup(Edit): - ref = attr.ib(default=None) - uncertain = attr.ib(default=False) + ref: str | None = attr.ib(default=None) + uncertain: bool = attr.ib(default=False) - def format(self, conf=None): + def format(self, conf: Config | None = None) -> str: max_ref_length = self._format_config_na(conf) if max_ref_length is not None: ref = self.ref_s @@ -422,7 +431,7 @@ def format(self, conf=None): __str__ = format @property - def ref_s(self): + def ref_s(self) -> str | None: """ returns a string representing the ref sequence, if it is not None and smells like a sequence """ @@ -432,7 +441,7 @@ def ref_s(self): else None ) - def _set_uncertain(self): + def _set_uncertain(self) -> "Dup": """sets the uncertain flag to True; used primarily by the HGVS grammar :returns: self @@ -441,14 +450,14 @@ def _set_uncertain(self): return self @property - def type(self): + def type(self) -> str: """return the type of this Edit :returns: edit type (str) """ return "dup" - def _del_ins_lengths(self, ilen): + def _del_ins_lengths(self, ilen: int) -> tuple[int, int]: """returns (del_len, ins_len). Unspecified ref or alt returns None for del_len or ins_len respectively. """ @@ -459,12 +468,12 @@ def _del_ins_lengths(self, ilen): @attr.s(slots=True) class Repeat(Edit): - ref = attr.ib(default=None) - min = attr.ib(default=None) - max = attr.ib(default=None) - uncertain = attr.ib(default=False) + ref: str | None = attr.ib(default=None) + min: int | None = attr.ib(default=None) + max: int | None = attr.ib(default=None) + uncertain: bool = attr.ib(default=False) - def format(self, conf=None): + def format(self, conf: Config | None = None) -> str: if self.min > self.max: raise HGVSError("Repeat min count must be less than or equal to max count") max_ref_length = self._format_config_na(conf) @@ -477,7 +486,7 @@ def format(self, conf=None): __str__ = format - def _set_uncertain(self): + def _set_uncertain(self) -> "Repeat": """sets the uncertain flag to True; used primarily by the HGVS grammar :returns: self @@ -486,7 +495,7 @@ def _set_uncertain(self): return self @property - def type(self): + def type(self) -> str: """return the type of this Edit :returns: edit type (str) @@ -505,13 +514,13 @@ class NACopy(Edit): """ copy = attr.ib(default=None) - uncertain = attr.ib(default=False) + uncertain: bool = attr.ib(default=False) - def __str__(self): + def __str__(self) -> str: s = "copy{}".format(self.copy) return "(" + s + ")" if self.uncertain else s - def _set_uncertain(self): + def _set_uncertain(self) -> "NACopy": """sets the uncertain flag to True; used primarily by the HGVS grammar :returns: self @@ -520,14 +529,14 @@ def _set_uncertain(self): return self @property - def type(self): + def type(self) -> str: """return the type of this Edit :returns: edit type (str) """ return "copy" - def _del_ins_lengths(self, ilen): + def _del_ins_lengths(self, ilen: int) -> tuple[int, int]: """returns (del_len, ins_len). Unspecified ref or alt returns None for del_len or ins_len respectively. """ @@ -538,13 +547,13 @@ def _del_ins_lengths(self, ilen): class Inv(Edit): """Inversion""" - ref = attr.ib(default=None) - uncertain = attr.ib(default=False) + ref: str | int | None = attr.ib(default=None) + uncertain: bool = attr.ib(default=False) - def __str__(self): + def __str__(self) -> str: return "inv" - def _set_uncertain(self): + def _set_uncertain(self) -> "Inv": """sets the uncertain flag to True; used primarily by the HGVS grammar :returns: self @@ -553,7 +562,7 @@ def _set_uncertain(self): return self @property - def ref_s(self): + def ref_s(self) -> str | None: return ( self.ref if (isinstance(self.ref, str) and self.ref and self.ref[0] in "ACGTUN") @@ -561,7 +570,7 @@ def ref_s(self): ) @property - def ref_n(self): + def ref_n(self) -> int | None: """ returns an integer, either from the `seq` instance variable if it's a number, or None otherwise @@ -572,14 +581,14 @@ def ref_n(self): return None @property - def type(self): + def type(self) -> str: """return the type of this Edit :returns: edit type (str) """ return "inv" - def _del_ins_lengths(self, ilen): + def _del_ins_lengths(self, ilen: int) -> tuple[int, int]: """returns (del_len, ins_len). Unspecified ref or alt returns None for del_len or ins_len respectively. """ @@ -590,19 +599,19 @@ def _del_ins_lengths(self, ilen): class Conv(Edit): """Conversion""" - from_ac = attr.ib(default=None) - from_type = attr.ib(default=None) - from_pos = attr.ib(default=None) - uncertain = attr.ib(default=False) + from_ac: str | None = attr.ib(default=None) + from_type: str | None = attr.ib(default=None) + from_pos: Interval | None = attr.ib(default=None) + uncertain: bool = attr.ib(default=False) - def __str__(self): + def __str__(self) -> str: if self.from_ac and self.from_type and self.from_pos: s = "con{self.from_ac}:{self.from_type}.{self.from_pos}".format(self=self) else: s = "con" return "(" + s + ")" if self.uncertain else s - def _set_uncertain(self): + def _set_uncertain(self) -> "Conv": """sets the uncertain flag to True; used primarily by the HGVS grammar :returns: self @@ -611,7 +620,7 @@ def _set_uncertain(self): return self @property - def type(self): + def type(self) -> str: """return the type of this Edit :returns: edit type (str) diff --git a/src/hgvs/hgvsposition.py b/src/hgvs/hgvsposition.py index 78742213..9096bad9 100644 --- a/src/hgvs/hgvsposition.py +++ b/src/hgvs/hgvsposition.py @@ -1,9 +1,8 @@ # -*- coding: utf-8 -*- -"""Represent partial HGVS tags that refer to a position without alleles - -""" +"""Represent partial HGVS tags that refer to a position without alleles""" import attr +from hgvs.location import Interval @attr.s(slots=True, repr=False) @@ -18,19 +17,22 @@ class HGVSPosition: """ - ac = attr.ib() - type = attr.ib() - pos = attr.ib() - gene = attr.ib(default=None) + ac: str = attr.ib() + type: str = attr.ib() + pos: Interval = attr.ib() + gene: str | None = attr.ib(default=None) - def __str__(self): + def __str__(self) -> str: g = "" if not self.gene else "(" + self.gene + ")" return "{self.ac}{g}:{self.type}.{self.pos}".format(self=self, g=g) - def __repr__(self): + def __repr__(self) -> str: return "{0}({1})".format( self.__class__.__name__, - ", ".join((a.name + "=" + str(getattr(self, a.name))) for a in self.__attrs_attrs__), + ", ".join( + (a.name + "=" + str(getattr(self, a.name))) + for a in self.__attrs_attrs__ + ), ) diff --git a/src/hgvs/intervalmapper.py b/src/hgvs/intervalmapper.py index 051504e5..938bb3d8 100644 --- a/src/hgvs/intervalmapper.py +++ b/src/hgvs/intervalmapper.py @@ -60,17 +60,19 @@ class Interval: __slots__ = ("start_i", "end_i") - def __init__(self, start_i, end_i): + def __init__(self, start_i: int, end_i: int) -> None: if not (start_i <= end_i): - raise HGVSInvalidIntervalError("start_i must be less than or equal to end_i") + raise HGVSInvalidIntervalError( + "start_i must be less than or equal to end_i" + ) self.start_i = start_i self.end_i = end_i @property - def len(self): + def len(self) -> int: return self.end_i - self.start_i - def __repr__(self): + def __repr__(self) -> str: return "{self.__class__.__name__}(start_i={self.start_i},end_i={self.end_i})".format( self=self ) @@ -83,7 +85,7 @@ class IntervalPair: __slots__ = ("ref", "tgt") - def __init__(self, ref, tgt): + def __init__(self, ref: Interval, tgt: Interval) -> None: if not ( (ref.len == tgt.len) or (ref.len == 0 and tgt.len != 0) @@ -95,17 +97,25 @@ def __init__(self, ref, tgt): self.ref = ref self.tgt = tgt - def __repr__(self): - return "{self.__class__.__name__}(ref={self.ref},tgt={self.tgt})".format(self=self) + def __repr__(self) -> str: + return "{self.__class__.__name__}(ref={self.ref},tgt={self.tgt})".format( + self=self + ) class IntervalMapper: """Provides mapping between sequence coordinates according to an ordered set of IntervalPairs.""" - __slots__ = ("interval_pairs", "ref_intervals", "tgt_intervals", "ref_len", "tgt_len") + __slots__ = ( + "interval_pairs", + "ref_intervals", + "tgt_intervals", + "ref_len", + "tgt_len", + ) - def __init__(self, interval_pairs): + def __init__(self, interval_pairs: list[IntervalPair]) -> None: """ :param interval_pairs: an ordered list of IntervalPair instances :type interval_pairs: list (of IntervalPair instances). @@ -128,7 +138,7 @@ def _validate_intervals(ivs): self.tgt_len = sum([iv.len for iv in self.tgt_intervals]) @staticmethod - def from_cigar(cigar): + def from_cigar(cigar: str) -> "IntervalMapper": """ :param cigar: a Compact Idiosyncratic Gapped Alignment Report string :type cigar: str. @@ -136,14 +146,28 @@ def from_cigar(cigar): """ return IntervalMapper(cigar_to_intervalpairs(cigar)) - def map_ref_to_tgt(self, start_i, end_i, max_extent=False): - return self._map(self.ref_intervals, self.tgt_intervals, start_i, end_i, max_extent) + def map_ref_to_tgt( + self, start_i: int, end_i: int, max_extent: bool = False + ) -> tuple[int, int]: + return self._map( + self.ref_intervals, self.tgt_intervals, start_i, end_i, max_extent + ) - def map_tgt_to_ref(self, start_i, end_i, max_extent=False): - return self._map(self.tgt_intervals, self.ref_intervals, start_i, end_i, max_extent) + def map_tgt_to_ref( + self, start_i: int, end_i: int, max_extent: bool = False + ) -> tuple[int, int]: + return self._map( + self.tgt_intervals, self.ref_intervals, start_i, end_i, max_extent + ) @staticmethod - def _map(from_ivs, to_ivs, from_start_i, from_end_i, max_extent): + def _map( + from_ivs: list[Interval], + to_ivs: list[Interval], + from_start_i: int, + from_end_i: int, + max_extent: bool, + ) -> tuple[int, int]: def iv_map(from_ivs, to_ivs, from_start_i, from_end_i, max_extent): """returns the intervals indexes in which from_start_i and from_end_i occur""" # first look for 0-width interval that matches @@ -155,8 +179,16 @@ def iv_map(from_ivs, to_ivs, from_start_i, from_end_i, max_extent): if len(seil) > 0: si = ei = seil[0] else: - sil = [i for i, iv in enumerate(from_ivs) if iv.start_i <= from_start_i <= iv.end_i] - eil = [i for i, iv in enumerate(from_ivs) if iv.start_i <= from_end_i <= iv.end_i] + sil = [ + i + for i, iv in enumerate(from_ivs) + if iv.start_i <= from_start_i <= iv.end_i + ] + eil = [ + i + for i, iv in enumerate(from_ivs) + if iv.start_i <= from_end_i <= iv.end_i + ] if len(sil) == 0 or len(eil) == 0: raise HGVSInvalidIntervalError( "start or end or both are beyond the bounds of transcript record" @@ -175,7 +207,9 @@ def clip_to_iv(iv, pos): to_start_i = clip_to_iv( to_ivs[si], to_ivs[si].start_i + (from_start_i - from_ivs[si].start_i) ) - to_end_i = clip_to_iv(to_ivs[ei], to_ivs[ei].end_i - (from_ivs[ei].end_i - from_end_i)) + to_end_i = clip_to_iv( + to_ivs[ei], to_ivs[ei].end_i - (from_ivs[ei].end_i - from_end_i) + ) return to_start_i, to_end_i @@ -186,22 +220,22 @@ class CIGARElement: __slots__ = ("len", "op") - def __init__(self, len, op): + def __init__(self, len: int, op: str) -> None: self.len = len self.op = op @property - def ref_len(self): + def ref_len(self) -> int: """returns number of nt/aa consumed in reference sequence for this edit""" return self.len if self.op in "=INX" else 0 @property - def tgt_len(self): + def tgt_len(self) -> int: """returns number of nt/aa consumed in target sequence for this edit""" return self.len if self.op in "=DX" else 0 -def cigar_to_intervalpairs(cigar): +def cigar_to_intervalpairs(cigar: str) -> list[IntervalPair]: """For a given CIGAR string, return a list of (Interval,Interval) pairs. The length of the returned list will be equal to the number of CIGAR operations @@ -216,7 +250,8 @@ def cigar_to_intervalpairs(cigar): ref_pos = tgt_pos = 0 for i, ce in enumerate(ces): ips[i] = IntervalPair( - Interval(ref_pos, ref_pos + ce.ref_len), Interval(tgt_pos, tgt_pos + ce.tgt_len) + Interval(ref_pos, ref_pos + ce.ref_len), + Interval(tgt_pos, tgt_pos + ce.tgt_len), ) ref_pos += ce.ref_len tgt_pos += ce.tgt_len diff --git a/src/hgvs/location.py b/src/hgvs/location.py index 1be10915..4e7cf582 100644 --- a/src/hgvs/location.py +++ b/src/hgvs/location.py @@ -16,6 +16,7 @@ """ from functools import total_ordering +from hgvs.config import Config import attr from bioutils.sequences import aa1_to_aa3 @@ -28,25 +29,28 @@ @attr.s(slots=True, repr=False, cmp=False) @total_ordering class SimplePosition: - base = attr.ib(default=None) - uncertain = attr.ib(default=False) + base: int | None = attr.ib(default=None) + uncertain: bool = attr.ib(default=False) - def __str__(self): + def __str__(self) -> str: self.validate() s = "?" if self.base is None else str(self.base) return "(" + s + ")" if self.uncertain else s - def format(self, conf): + def format(self, conf: Config) -> str: return str(self) - def __repr__(self): + def __repr__(self) -> str: return "{0}({1})".format( self.__class__.__name__, - ", ".join((a.name + "=" + str(getattr(self, a.name))) for a in self.__attrs_attrs__), + ", ".join( + (a.name + "=" + str(getattr(self, a.name))) + for a in self.__attrs_attrs__ + ), ) @property - def is_uncertain(self): + def is_uncertain(self) -> bool: """return True if the position is marked uncertain or undefined""" return self.uncertain or self.base is None @@ -55,25 +59,35 @@ def _set_uncertain(self): self.uncertain = True return self - def validate(self): + def validate(self) -> tuple[ValidationLevel, str | None]: if self.base is not None and self.base < 1: return (ValidationLevel.ERROR, "Position base must be >= 1") return (ValidationLevel.VALID, None) - def __sub__(lhs, rhs): - assert type(lhs) == type(rhs), "Cannot substract coordinates of different representations" + def __sub__(lhs: "SimplePosition", rhs: "SimplePosition") -> int: + assert type(lhs) is type(rhs), ( + "Cannot substract coordinates of different representations" + ) return lhs.base - rhs.base - def __eq__(lhs, rhs): - assert type(lhs) == type(rhs), "Cannot compare coordinates of different representations" + def __eq__(lhs: "SimplePosition", rhs: "SimplePosition") -> bool: + assert type(lhs) is type(rhs), ( + "Cannot compare coordinates of different representations" + ) if lhs.uncertain or rhs.uncertain: - raise HGVSUnsupportedOperationError("Cannot compare coordinates of uncertain positions") + raise HGVSUnsupportedOperationError( + "Cannot compare coordinates of uncertain positions" + ) return lhs.base == rhs.base - def __lt__(lhs, rhs): - assert type(lhs) == type(rhs), "Cannot compare coordinates of different representations" + def __lt__(lhs: "SimplePosition", rhs: "SimplePosition") -> bool: + assert type(lhs) is type(rhs), ( + "Cannot compare coordinates of different representations" + ) if lhs.uncertain or rhs.uncertain: - raise HGVSUnsupportedOperationError("Cannot compare coordinates of uncertain positions") + raise HGVSUnsupportedOperationError( + "Cannot compare coordinates of uncertain positions" + ) return lhs.base < rhs.base @@ -113,12 +127,12 @@ class BaseOffsetPosition: +----------+------------+-------+---------+------------------------------------------+ """ - base = attr.ib(default=None) - offset = attr.ib(default=0) - datum = attr.ib(default=Datum.SEQ_START) - uncertain = attr.ib(default=False) + base: int | None = attr.ib(default=None) + offset: int = attr.ib(default=0) + datum: Datum = attr.ib(default=Datum.SEQ_START) + uncertain: bool = attr.ib(default=False) - def validate(self): + def validate(self) -> tuple[ValidationLevel, str | None]: if self.base is not None and self.base == 0: return (ValidationLevel.ERROR, "BaseOffsetPosition base may not be 0") if ( @@ -133,7 +147,7 @@ def validate(self): ) return (ValidationLevel.VALID, None) - def __str__(self): + def __str__(self) -> str: self.validate() base_str = ( "?" @@ -143,37 +157,46 @@ def __str__(self): else str(self.base) ) offset_str = ( - "+?" if self.offset is None else "" if self.offset == 0 else "%+d" % self.offset + "+?" + if self.offset is None + else "" + if self.offset == 0 + else "%+d" % self.offset ) pos = base_str + offset_str return "(" + pos + ")" if self.uncertain else pos - def format(self, conf): + def format(self, conf: Config) -> str: return str(self) - def __repr__(self): + def __repr__(self) -> str: return "{0}({1})".format( self.__class__.__name__, - ", ".join((a.name + "=" + str(getattr(self, a.name))) for a in self.__attrs_attrs__), + ", ".join( + (a.name + "=" + str(getattr(self, a.name))) + for a in self.__attrs_attrs__ + ), ) - def _set_uncertain(self): + def _set_uncertain(self) -> "BaseOffsetPosition": "mark this location as uncertain and return reference to self; this is called during parsing (see hgvs.ometa)" self.uncertain = True return self @property - def is_uncertain(self): + def is_uncertain(self) -> bool: """return True if the position is marked uncertain or undefined""" return self.uncertain or self.base is None or self.offset is None @property - def is_intronic(self): + def is_intronic(self) -> bool: """returns True if the variant is intronic (if the offset is None or non-zero)""" return self.offset is None or self.offset != 0 - def __sub__(lhs, rhs): - assert type(lhs) == type(rhs), "Cannot substract coordinates of different representations" + def __sub__(lhs: "BaseOffsetPosition", rhs: "BaseOffsetPosition") -> int: + assert type(lhs) is type(rhs), ( + "Cannot substract coordinates of different representations" + ) if lhs.datum != rhs.datum: raise HGVSUnsupportedOperationError( "Interval length measured from different datums is ill-defined" @@ -187,16 +210,26 @@ def __sub__(lhs, rhs): straddles_zero = 1 if (lhs.base > 0 and rhs.base < 0) else 0 return lhs.base - rhs.base - straddles_zero - def __eq__(lhs, rhs): - assert type(lhs) == type(rhs), "Cannot compare coordinates of different representations" + def __eq__(lhs: "BaseOffsetPosition", rhs: "BaseOffsetPosition") -> bool: + assert type(lhs) is type(rhs), ( + "Cannot compare coordinates of different representations" + ) if lhs.uncertain or rhs.uncertain: - raise HGVSUnsupportedOperationError("Cannot compare coordinates of uncertain positions") - return lhs.datum == rhs.datum and lhs.base == rhs.base and lhs.offset == rhs.offset + raise HGVSUnsupportedOperationError( + "Cannot compare coordinates of uncertain positions" + ) + return ( + lhs.datum == rhs.datum and lhs.base == rhs.base and lhs.offset == rhs.offset + ) - def __lt__(lhs, rhs): - assert type(lhs) == type(rhs), "Cannot compare coordinates of different representations" + def __lt__(lhs: "BaseOffsetPosition", rhs: "BaseOffsetPosition") -> bool: + assert type(lhs) is type(rhs), ( + "Cannot compare coordinates of different representations" + ) if lhs.uncertain or rhs.uncertain: - raise HGVSUnsupportedOperationError("Cannot compare coordinates of uncertain positions") + raise HGVSUnsupportedOperationError( + "Cannot compare coordinates of uncertain positions" + ) if lhs.datum == rhs.datum: if lhs.base == rhs.base: return lhs.offset < rhs.offset @@ -220,18 +253,18 @@ def __lt__(lhs, rhs): @attr.s(slots=True, repr=False, cmp=False) class AAPosition: - base = attr.ib(default=None) - aa = attr.ib(default=None) - uncertain = attr.ib(default=False) + base: int | None = attr.ib(default=None) + aa: str | None = attr.ib(default=None) + uncertain: bool = attr.ib(default=False) - def validate(self): + def validate(self) -> tuple[ValidationLevel, str | None]: if self.base is not None and self.base != "" and self.base < 1: return (ValidationLevel.ERROR, "AAPosition location must be >=1") if self.aa is not None and len(self.aa) > 1: return (ValidationLevel.ERROR, "More than 1 AA associated with position") return (ValidationLevel.VALID, None) - def format(self, conf=None): + def format(self, conf: Config | None = None) -> str: self.validate() p_3_letter = hgvs.global_config.formatting.p_3_letter @@ -253,69 +286,94 @@ def format(self, conf=None): __str__ = format - def __repr__(self): + def __repr__(self) -> str: return "{0}({1})".format( self.__class__.__name__, - ", ".join((a.name + "=" + str(getattr(self, a.name))) for a in self.__attrs_attrs__), + ", ".join( + (a.name + "=" + str(getattr(self, a.name))) + for a in self.__attrs_attrs__ + ), ) @property - def pos(self): + def pos(self) -> int | None: """return base, for backward compatibility""" return self.base - def _set_uncertain(self): + def _set_uncertain(self) -> "AAPosition": "mark this location as uncertain and return reference to self; this is called during parsing (see hgvs.ometa)" self.uncertain = True return self @property - def is_uncertain(self): + def is_uncertain(self) -> bool: """return True if the position is marked uncertain or undefined""" return self.uncertain or self.base is None or self.aa is None - def __sub__(lhs, rhs): - assert type(lhs) == type(rhs), "Cannot substract coordinates of different representations" + def __sub__(lhs: "AAPosition", rhs: "AAPosition") -> int: + assert type(lhs) is type(rhs), ( + "Cannot substract coordinates of different representations" + ) return lhs.base - rhs.base - def __eq__(lhs, rhs): - assert type(lhs) == type(rhs), "Cannot compare coordinates of different representations" + def __eq__(lhs: "AAPosition", rhs: "AAPosition") -> bool: + assert type(lhs) is type(rhs), ( + "Cannot compare coordinates of different representations" + ) if lhs.uncertain or rhs.uncertain: - raise HGVSUnsupportedOperationError("Cannot compare coordinates of uncertain positions") + raise HGVSUnsupportedOperationError( + "Cannot compare coordinates of uncertain positions" + ) return lhs.base == rhs.base and lhs.aa == rhs.aa - def __lt__(lhs, rhs): - assert type(lhs) == type(rhs), "Cannot compare coordinates of different representations" + def __lt__(lhs: "AAPosition", rhs: "AAPosition") -> bool: + assert type(lhs) is type(rhs), ( + "Cannot compare coordinates of different representations" + ) if lhs.uncertain or rhs.uncertain: - raise HGVSUnsupportedOperationError("Cannot compare coordinates of uncertain positions") + raise HGVSUnsupportedOperationError( + "Cannot compare coordinates of uncertain positions" + ) return lhs.base < rhs.base - def __gt__(lhs, rhs): - assert type(lhs) == type(rhs), "Cannot compare coordinates of different representations" + def __gt__(lhs: "AAPosition", rhs: "AAPosition") -> bool: + assert type(lhs) is type(rhs), ( + "Cannot compare coordinates of different representations" + ) if lhs.uncertain or rhs.uncertain: - raise HGVSUnsupportedOperationError("Cannot compare coordinates of uncertain positions") + raise HGVSUnsupportedOperationError( + "Cannot compare coordinates of uncertain positions" + ) return lhs.base > rhs.base - def __le__(lhs, rhs): - assert type(lhs) == type(rhs), "Cannot compare coordinates of different representations" + def __le__(lhs: "AAPosition", rhs: "AAPosition") -> bool: + assert type(lhs) is type(rhs), ( + "Cannot compare coordinates of different representations" + ) if lhs.uncertain or rhs.uncertain: - raise HGVSUnsupportedOperationError("Cannot compare coordinates of uncertain positions") + raise HGVSUnsupportedOperationError( + "Cannot compare coordinates of uncertain positions" + ) return lhs.base <= rhs.base - def __ge__(lhs, rhs): - assert type(lhs) == type(rhs), "Cannot compare coordinates of different representations" + def __ge__(lhs: "AAPosition", rhs: "AAPosition") -> bool: + assert type(lhs) is type(rhs), ( + "Cannot compare coordinates of different representations" + ) if lhs.uncertain or rhs.uncertain: - raise HGVSUnsupportedOperationError("Cannot compare coordinates of uncertain positions") + raise HGVSUnsupportedOperationError( + "Cannot compare coordinates of uncertain positions" + ) return lhs.base >= rhs.base @attr.s(slots=True, repr=False) class Interval: - start = attr.ib(default=None) - end = attr.ib(default=None) - uncertain = attr.ib(default=False) + start: SimplePosition | None = attr.ib(default=None) + end: SimplePosition | None = attr.ib(default=None) + uncertain: bool = attr.ib(default=False) - def validate(self): + def validate(self) -> tuple[ValidationLevel, str | None]: if self.start: (res, msg) = self.start.validate() if res != ValidationLevel.VALID: @@ -331,11 +389,14 @@ def validate(self): if self.start <= self.end: return (ValidationLevel.VALID, None) else: - return (ValidationLevel.ERROR, "base start position must be <= end position") + return ( + ValidationLevel.ERROR, + "base start position must be <= end position", + ) except HGVSUnsupportedOperationError as err: return (ValidationLevel.WARNING, str(err)) - def format(self, conf=None): + def format(self, conf: Config = None) -> str: if self.start is None: return "" if self.end is None or self.start == self.end: @@ -345,22 +406,25 @@ def format(self, conf=None): __str__ = format - def __repr__(self): + def __repr__(self) -> str: return "{0}({1})".format( self.__class__.__name__, - ", ".join((a.name + "=" + str(getattr(self, a.name))) for a in self.__attrs_attrs__), + ", ".join( + (a.name + "=" + str(getattr(self, a.name))) + for a in self.__attrs_attrs__ + ), ) - def _set_uncertain(self): + def _set_uncertain(self) -> "Interval": "mark this interval as uncertain and return reference to self; this is called during parsing (see hgvs.ometa)" self.uncertain = True return self - def _length(self): + def _length(self) -> int: return 1 if self.end is None else self.end - self.start + 1 @property - def is_uncertain(self): + def is_uncertain(self) -> bool: """return True if the position is marked uncertain or undefined""" return self.uncertain or self.start.is_uncertain or self.end.is_uncertain @@ -373,7 +437,7 @@ class BaseOffsetInterval(Interval): """ - def __attrs_post_init__(self): + def __attrs_post_init__(self) -> None: # #330: In a post-ter interval like *87_91, the * binds only # to the start. This means that the start.datum is CDS_END, # but the end.datum is CDS_START (the default). @@ -381,7 +445,7 @@ def __attrs_post_init__(self): self.end.datum = Datum.CDS_END self.check_datum() - def check_datum(self): + def check_datum(self) -> None: # check for valid combinations of start and end datums if (self.start.datum, self.end.datum) not in [ (Datum.SEQ_START, Datum.SEQ_START), diff --git a/src/hgvs/normalizer.py b/src/hgvs/normalizer.py index 4c8f3837..7f235b4d 100644 --- a/src/hgvs/normalizer.py +++ b/src/hgvs/normalizer.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- -"""hgvs.normalizer -""" +"""hgvs.normalizer""" import copy import logging @@ -25,13 +24,13 @@ class Normalizer: def __init__( self, - hdp, + hdp: hgvs.dataproviders.interface.Interface, cross_boundaries=hgvs.global_config.normalizer.cross_boundaries, shuffle_direction=hgvs.global_config.normalizer.shuffle_direction, alt_aln_method=hgvs.global_config.mapping.alt_aln_method, validate=hgvs.global_config.normalizer.validate, variantmapper=None, - ): + ) -> None: """Initialize and configure the normalizer :param hdp: HGVS Data Provider Interface-compliant instance @@ -42,9 +41,9 @@ def __init__( :param validate: whether validating the input variant before normalizing """ - assert ( - shuffle_direction == 3 or shuffle_direction == 5 - ), "The shuffling direction should be 3 (3' most) or 5 (5' most)." + assert shuffle_direction == 3 or shuffle_direction == 5, ( + "The shuffling direction should be 3 (3' most) or 5 (5' most)." + ) self.hdp = hdp self.shuffle_direction = shuffle_direction self.cross_boundaries = cross_boundaries @@ -54,11 +53,13 @@ def __init__( self.validator = hgvs.validator.IntrinsicValidator(strict=False) self.vm = variantmapper or hgvs.variantmapper.VariantMapper(self.hdp) - def normalize(self, var): + def normalize( + self, var: hgvs.sequencevariant.SequenceVariant + ) -> hgvs.sequencevariant.SequenceVariant: """Perform sequence variants normalization for single variant""" - assert isinstance( - var, hgvs.sequencevariant.SequenceVariant - ), "variant must be a parsed HGVS sequence variant object" + assert isinstance(var, hgvs.sequencevariant.SequenceVariant), ( + "variant must be a parsed HGVS sequence variant object" + ) # keep a shallow reference to the original variant, to be returned # as-is under certain circumstances @@ -71,7 +72,12 @@ def normalize(self, var): if var.posedit is not None and isinstance(var.posedit, hgvs.edit.AARefAlt): init_met = var.posedit.init_met - if var.posedit is None or var.posedit.uncertain or init_met or var.posedit.pos is None: + if ( + var.posedit is None + or var.posedit.uncertain + or init_met + or var.posedit.pos is None + ): return var type = var.type @@ -116,7 +122,9 @@ def is_valid_pos(ac, pos): # was invalid. return "Bad Request" not in str(e) - if var.posedit.pos.start.base < 0 or not is_valid_pos(var.ac, var.posedit.pos.end.base): + if var.posedit.pos.start.base < 0 or not is_valid_pos( + var.ac, var.posedit.pos.end.base + ): if hgvs.global_config.mapping.strict_bounds: raise HGVSInvalidVariantError(f"{var}: coordinates are out-of-bounds") _logger.warning(f"{var}: coordinates are out-of-bounds; returning as-is") @@ -210,7 +218,9 @@ def is_valid_pos(ac, pos): return var_norm - def _get_boundary(self, var): + def _get_boundary( + self, var: hgvs.sequencevariant.SequenceVariant + ) -> tuple[int, int]: """Get the position of exon-intron boundary for current variant""" if var.type == "r" or var.type == "n": if self.cross_boundaries: @@ -223,7 +233,9 @@ def _get_boundary(self, var): "No mapping info available for {ac}".format(ac=var.ac) ) map_info = [ - item for item in map_info if item["alt_aln_method"] == self.alt_aln_method + item + for item in map_info + if item["alt_aln_method"] == self.alt_aln_method ] alt_ac = map_info[0]["alt_ac"] @@ -301,7 +313,7 @@ def _get_boundary(self, var): # For variant type of g and m etc. return 0, float("inf") - def _get_tgt_length(self, var): + def _get_tgt_length(self, var: hgvs.sequencevariant.SequenceVariant) -> int: """Get the total length of the whole reference sequence""" if var.type == "g" or var.type == "m": return float("inf") @@ -315,7 +327,14 @@ def _get_tgt_length(self, var): tgt_len = sum(identity_info["lengths"]) return tgt_len - def _fetch_bounded_seq(self, var, start, end, window_size, boundary): + def _fetch_bounded_seq( + self, + var: hgvs.sequencevariant.SequenceVariant, + start: int, + end: int, + window_size: int, + boundary: tuple[int, int], + ) -> str: """Fetch reference sequence from hgvs data provider. The start position is 0 and the interval is half open @@ -336,7 +355,9 @@ def _fetch_bounded_seq(self, var, start, end, window_size, boundary): return seq - def _get_ref_alt(self, var, boundary): + def _get_ref_alt( + self, var: hgvs.sequencevariant.SequenceVariant, boundary: tuple[int, int] + ) -> tuple[str, str]: """Get reference allele and alternative allele of the variant""" # Get reference allele @@ -346,7 +367,11 @@ def _get_ref_alt(self, var, boundary): # For NARefAlt and Inv if var.posedit.edit.ref_s is None or var.posedit.edit.ref == "": ref = self._fetch_bounded_seq( - var, var.posedit.pos.start.base - 1, var.posedit.pos.end.base, 0, boundary + var, + var.posedit.pos.start.base - 1, + var.posedit.pos.end.base, + 0, + boundary, ) else: ref = var.posedit.edit.ref @@ -362,7 +387,11 @@ def _get_ref_alt(self, var, boundary): alt = "" elif var.posedit.edit.type == "dup": alt = var.posedit.edit.ref or self._fetch_bounded_seq( - var, var.posedit.pos.start.base - 1, var.posedit.pos.end.base, 0, boundary + var, + var.posedit.pos.start.base - 1, + var.posedit.pos.end.base, + 0, + boundary, ) elif var.posedit.edit.type == "inv": alt = reverse_complement(ref) @@ -371,7 +400,9 @@ def _get_ref_alt(self, var, boundary): return ref, alt - def _normalize_alleles(self, var, boundary): + def _normalize_alleles( + self, var: hgvs.sequencevariant.SequenceVariant, boundary: tuple[int, int] + ) -> tuple[int, int, tuple[str, str]]: """Normalize the variant until it could not be shuffled""" ref, alt = self._get_ref_alt(var, boundary) @@ -427,7 +458,9 @@ def _normalize_alleles(self, var, boundary): start -= boundary[0] + 1 - base stop -= boundary[0] + 1 - base base = boundary[0] + 1 - ref_seq = self._fetch_bounded_seq(var, base - 1, base + stop - 1, start, boundary) + ref_seq = self._fetch_bounded_seq( + var, base - 1, base + stop - 1, start, boundary + ) if ref_seq == "": break orig_start, orig_stop = start, stop @@ -443,6 +476,7 @@ def _normalize_alleles(self, var, boundary): return base + start, base + stop, (ref, alt) + # # Copyright 2018 HGVS Contributors (https://github.com/biocommons/hgvs) # diff --git a/src/hgvs/parser.py b/src/hgvs/parser.py index 1d584c45..ab5ca0a0 100644 --- a/src/hgvs/parser.py +++ b/src/hgvs/parser.py @@ -23,6 +23,7 @@ import hgvs.sequencevariant from hgvs.exceptions import HGVSParseError from hgvs.generated.hgvs_grammar import createParserClass +from typing import TextIO class Parser: @@ -86,7 +87,9 @@ class Parser: """ - def __init__(self, grammar_fn=None, expose_all_rules=False): + def __init__( + self, grammar_fn: TextIO | None = None, expose_all_rules: bool = False + ): bindings = {"hgvs": hgvs, "bioutils": bioutils, "copy": copy} if grammar_fn is None: self._grammar = parsley.wrapGrammar( @@ -99,7 +102,7 @@ def __init__(self, grammar_fn=None, expose_all_rules=False): self._logger = logging.getLogger(__name__) self._expose_rule_functions(expose_all_rules) - def parse(self, v) -> hgvs.sequencevariant.SequenceVariant: + def parse(self, v: str) -> hgvs.sequencevariant.SequenceVariant: """parse HGVS variant `v`, returning a SequenceVariant :param str v: an HGVS-formatted variant as a string @@ -108,7 +111,7 @@ def parse(self, v) -> hgvs.sequencevariant.SequenceVariant: """ return self.parse_hgvs_variant(v) - def _expose_rule_functions(self, expose_all_rules=False): + def _expose_rule_functions(self, expose_all_rules: bool = False) -> None: """add parse functions for public grammar rules Defines a function for each public grammar rule, based on @@ -119,7 +122,7 @@ def _expose_rule_functions(self, expose_all_rules=False): """ - def make_parse_rule_function(rule_name): + def make_parse_rule_function(rule_name: str): "builds a wrapper function that parses a string with the specified rule" def rule_fxn(s): @@ -146,7 +149,9 @@ def rule_fxn(s): ] if not expose_all_rules: exposed_rules = [ - rule_name for rule_name in exposed_rules if exposed_rule_re.match(rule_name) + rule_name + for rule_name in exposed_rules + if exposed_rule_re.match(rule_name) ] for rule_name in exposed_rules: att_name = "parse_" + rule_name diff --git a/src/hgvs/posedit.py b/src/hgvs/posedit.py index a7618230..70f6e8b8 100644 --- a/src/hgvs/posedit.py +++ b/src/hgvs/posedit.py @@ -1,12 +1,13 @@ # -*- coding: utf-8 -*- -"""implements a (position,edit) tuple that represents a localized sequence change - -""" +"""implements a (position,edit) tuple that represents a localized sequence change""" import attr from hgvs.enums import ValidationLevel from hgvs.exceptions import HGVSUnsupportedOperationError +from hgvs.location import Interval +from hgvs.edit import Edit +from hgvs.config import Config @attr.s(slots=True, repr=False) @@ -15,16 +16,18 @@ class PosEdit: represents a **simple** variant, consisting of a single position and edit pair """ - pos = attr.ib(default=None) - edit = attr.ib(default=None) - uncertain = attr.ib(default=False) + pos: Interval | None = attr.ib(default=None) + edit: Edit | None = attr.ib(default=None) + uncertain: bool = attr.ib(default=False) - def format(self, conf=None): + def format(self, conf: Config | None = None) -> str: """Formatting the string of PosEdit""" if self.pos is None: rv = str(self.edit.format(conf)) else: - rv = "{pos}{edit}".format(pos=self.pos.format(conf), edit=self.edit.format(conf)) + rv = "{pos}{edit}".format( + pos=self.pos.format(conf), edit=self.edit.format(conf) + ) if self.uncertain: if self.edit in ["0", ""]: @@ -38,10 +41,13 @@ def format(self, conf=None): def __repr__(self): return "{0}({1})".format( self.__class__.__name__, - ", ".join((a.name + "=" + str(getattr(self, a.name))) for a in self.__attrs_attrs__), + ", ".join( + (a.name + "=" + str(getattr(self, a.name))) + for a in self.__attrs_attrs__ + ), ) - def _set_uncertain(self): + def _set_uncertain(self) -> "PosEdit": """sets the uncertain flag to True; used primarily by the HGVS grammar :returns: self @@ -49,7 +55,7 @@ def _set_uncertain(self): self.uncertain = True return self - def length_change(self, on_error_raise=True): + def length_change(self, on_error_raise: bool = True) -> int | None: """Returns the net length change for this posedit. The method for computing the net length change depends on the @@ -86,7 +92,7 @@ def length_change(self, on_error_raise=True): raise return None - def validate(self): + def validate(self) -> tuple[str, str | None]: if self.pos: (res, msg) = self.pos.validate() if res != ValidationLevel.VALID: @@ -98,7 +104,10 @@ def validate(self): # Check del length if self.edit.type in ["del", "delins"]: ref_len = self.edit.ref_n - if ref_len is not None and ref_len != self.pos.end - self.pos.start + 1: + if ( + ref_len is not None + and ref_len != self.pos.end - self.pos.start + 1 + ): return ( ValidationLevel.ERROR, "Length implied by coordinates must equal sequence deletion length", diff --git a/src/hgvs/projector.py b/src/hgvs/projector.py index 8c9374fa..1daa091a 100644 --- a/src/hgvs/projector.py +++ b/src/hgvs/projector.py @@ -8,6 +8,9 @@ import hgvs import hgvs.alignmentmapper +from hgvs.dataproviders.interface import Interface +import hgvs.location +import hgvs.sequencevariant class Projector: @@ -29,39 +32,49 @@ class Projector: def __init__( self, - hdp, - alt_ac, - src_ac, - dst_ac, - src_alt_aln_method=hgvs.global_config.mapping.alt_aln_method, - dst_alt_aln_method=hgvs.global_config.mapping.alt_aln_method, - ): + hdp: Interface, + alt_ac: str, + src_ac: str, + dst_ac: str, + src_alt_aln_method: str = hgvs.global_config.mapping.alt_aln_method, + dst_alt_aln_method: str = hgvs.global_config.mapping.alt_aln_method, + ) -> None: self.hdp = hdp self.alt_ac = alt_ac - self.src_tm = hgvs.alignmentmapper.AlignmentMapper(hdp, src_ac, alt_ac, src_alt_aln_method) - self.dst_tm = hgvs.alignmentmapper.AlignmentMapper(hdp, dst_ac, alt_ac, dst_alt_aln_method) - - def project_interval_forward(self, c_interval): + self.src_tm = hgvs.alignmentmapper.AlignmentMapper( + hdp, src_ac, alt_ac, src_alt_aln_method + ) + self.dst_tm = hgvs.alignmentmapper.AlignmentMapper( + hdp, dst_ac, alt_ac, dst_alt_aln_method + ) + + def project_interval_forward( + self, c_interval: hgvs.location.Interval + ) -> hgvs.location.Interval: """ project c_interval on the source transcript to the destination transcript - :param c_interval: an :class:`hgvs.interval.Interval` object on the source transcript - :returns: c_interval: an :class:`hgvs.interval.Interval` object on the destination transcript + :param c_interval: an :class:`hgvs.location.Interval` object on the source transcript + :returns: c_interval: an :class:`hgvs.location.Interval` object on the destination transcript """ return self.dst_tm.g_to_c(self.src_tm.c_to_g(c_interval)) - def project_interval_backward(self, c_interval): + def project_interval_backward( + self, c_interval: hgvs.location.Interval + ) -> hgvs.location.Interval: """ project c_interval on the destination transcript to the source transcript - :param c_interval: an :class:`hgvs.interval.Interval` object on the destination transcript - :returns: c_interval: an :class:`hgvs.interval.Interval` object on the source transcript + :param c_interval: an :class:`hgvs.location.Interval` object on the destination transcript + :returns: c_interval: an :class:`hgvs.location.Interval` object on the source transcript """ return self.src_tm.g_to_c(self.dst_tm.c_to_g(c_interval)) - def project_variant_forward(self, c_variant): + def project_variant_forward( + self, c_variant: hgvs.sequencevariant.SequenceVariant + ) -> hgvs.sequencevariant.SequenceVariant: """ project c_variant on the source transcript onto the destination transcript @@ -77,7 +90,9 @@ def project_variant_forward(self, c_variant): new_c_variant.posedit.pos = self.project_interval_forward(c_variant.posedit.pos) return new_c_variant - def project_variant_backward(self, c_variant): + def project_variant_backward( + self, c_variant: hgvs.sequencevariant.SequenceVariant + ) -> hgvs.sequencevariant.SequenceVariant: """ project c_variant on the source transcript onto the destination transcript @@ -90,5 +105,7 @@ def project_variant_backward(self, c_variant): ) new_c_variant = copy.deepcopy(c_variant) new_c_variant.ac = self.src_tm.tx_ac - new_c_variant.posedit.pos = self.project_interval_backward(c_variant.posedit.pos) + new_c_variant.posedit.pos = self.project_interval_backward( + c_variant.posedit.pos + ) return new_c_variant diff --git a/src/hgvs/repeats.py b/src/hgvs/repeats.py index 088ef86a..3d7e94b2 100644 --- a/src/hgvs/repeats.py +++ b/src/hgvs/repeats.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -""" A class to manage conversion of SequenceVariants to repeat representation""" +"""A class to manage conversion of SequenceVariants to repeat representation""" from dataclasses import dataclass import re @@ -10,15 +10,13 @@ @dataclass(eq=True, repr=True, frozen=True, order=True) class RepeatUnit: repeat_count: int - repeat_unit:str - block_size:int + repeat_unit: str + block_size: int block: str class RepeatAnalyser: - - def __init__(self, fs: VariantCoords, reverse:bool=False) -> None: - + def __init__(self, fs: VariantCoords, reverse: bool = False) -> None: self.is_repeat = False self.repeat_units_ref = None self.repeat_units_alt = None @@ -32,40 +30,58 @@ def __init__(self, fs: VariantCoords, reverse:bool=False) -> None: self.repeat_units_alt = [] return - self.repeat_units_ref = detect_repetitive_block_lengths(self.ref_str, reverse=self.reverse) - self.repeat_units_alt = detect_repetitive_block_lengths(self.alt_str, reverse = self.reverse) + self.repeat_units_ref = detect_repetitive_block_lengths( + self.ref_str, reverse=self.reverse + ) + self.repeat_units_alt = detect_repetitive_block_lengths( + self.alt_str, reverse=self.reverse + ) - if len(self.repeat_units_ref) == 0 and len(self.repeat_units_alt) ==0 : + if len(self.repeat_units_ref) == 0 and len(self.repeat_units_alt) == 0: return - + # check longest repeat blocks: - # we only look at ref to determine if there are repeats + # we only look at ref to determine if there are repeats # If ref has no repeat, we don't call this a repeat variant, even if alt would have a repetitive unit longest_r_unit = self._get_longest_repeat_unit(self.repeat_units_ref) if longest_r_unit is None: return - + # filter our too fragmented results expected_size = len(self.ref_str) / 3 - if longest_r_unit.block_size < expected_size: + if longest_r_unit.block_size < expected_size: return - + if longest_r_unit.repeat_unit not in self.alt_str: return - self.repeat_units_alt = detect_repetitive_block_lengths(self.alt_str, longest_ref_unit = longest_r_unit, reverse = self.reverse) + self.repeat_units_alt = detect_repetitive_block_lengths( + self.alt_str, longest_ref_unit=longest_r_unit, reverse=self.reverse + ) self.is_repeat = True - ref_repeat = get_repeat_str(self.ref_str, self.alt_str, self.repeat_units_ref, self.repeat_units_alt, self.reverse) - alt_repeat = get_repeat_str(self.alt_str, self.ref_str, self.repeat_units_alt, self.repeat_units_ref, self.reverse) + ref_repeat = get_repeat_str( + self.ref_str, + self.alt_str, + self.repeat_units_ref, + self.repeat_units_alt, + self.reverse, + ) + alt_repeat = get_repeat_str( + self.alt_str, + self.ref_str, + self.repeat_units_alt, + self.repeat_units_ref, + self.reverse, + ) self.ref_str = ref_repeat self.alt_str = alt_repeat - def __repr__(self): + def __repr__(self) -> str: return f"{self.ref_str}>{self.alt_str}" - def _get_longest_repeat_unit(self, repeat_units:list[RepeatUnit])->RepeatUnit: + def _get_longest_repeat_unit(self, repeat_units: list[RepeatUnit]) -> RepeatUnit: lru = None for ru in repeat_units: if not lru: @@ -78,7 +94,9 @@ def _get_longest_repeat_unit(self, repeat_units:list[RepeatUnit])->RepeatUnit: return lru -def detect_repetitive_block_lengths(sequence: str, longest_ref_unit:RepeatUnit|None=None, reverse: bool = False) -> list[RepeatUnit]: +def detect_repetitive_block_lengths( + sequence: str, longest_ref_unit: RepeatUnit | None = None, reverse: bool = False +) -> list[RepeatUnit]: """Detects the length of repetitive blocks in a string, with an option to search from left to right or reverse. In reverse mode, it creates the largest possible blocks of the smallest possible units. @@ -89,21 +107,21 @@ def detect_repetitive_block_lengths(sequence: str, longest_ref_unit:RepeatUnit|N if longest_ref_unit is not None: # look for full containment of the longest ref repeat # this is so we can detect [2]>[1] (or really any repeat length variation to just 1) - pattern = f'({re.escape(longest_ref_unit.repeat_unit)})+' + pattern = f"({re.escape(longest_ref_unit.repeat_unit)})+" match = re.search(pattern, sequence) - + if match: repeat_count = len(sequence) // len(longest_ref_unit.repeat_unit) - block = repeat_count* longest_ref_unit.repeat_unit - ru = RepeatUnit(repeat_count, longest_ref_unit.repeat_unit, len(block), block) + block = repeat_count * longest_ref_unit.repeat_unit + ru = RepeatUnit( + repeat_count, longest_ref_unit.repeat_unit, len(block), block + ) result.append(ru) - shuffleable_bases = sequence[repeat_count*len(ru.repeat_unit):] - rus = detect_repetitive_block_lengths(shuffleable_bases, reverse = reverse) + shuffleable_bases = sequence[repeat_count * len(ru.repeat_unit) :] + rus = detect_repetitive_block_lengths(shuffleable_bases, reverse=reverse) result.extend(rus) return result - - if reverse: i = seq_len # Start from the end of the sequence while i > 0: @@ -114,7 +132,7 @@ def detect_repetitive_block_lengths(sequence: str, longest_ref_unit:RepeatUnit|N continue # Not enough characters to form a repeat unit # Extract the potential repeat unit ending at position i - repeat_unit = sequence[i - block_size:i] + repeat_unit = sequence[i - block_size : i] # Calculate the maximum possible number of repeats for this block size max_possible_repeats = i // block_size @@ -129,7 +147,9 @@ def detect_repetitive_block_lengths(sequence: str, longest_ref_unit:RepeatUnit|N substr = sequence[start_index:i] # Build the regex pattern for the current repeat unit and count - pattern = rf'({re.escape(repeat_unit)})' + r'{' + f'{repeat_count}' + r'}' + pattern = ( + rf"({re.escape(repeat_unit)})" + r"{" + f"{repeat_count}" + r"}" + ) # Check if the substring matches the pattern if re.fullmatch(pattern, substr): @@ -138,7 +158,7 @@ def detect_repetitive_block_lengths(sequence: str, longest_ref_unit:RepeatUnit|N block=repetitive_block, block_size=len(repetitive_block), repeat_unit=repeat_unit, - repeat_count=repeat_count + repeat_count=repeat_count, ) result.append(ru) # Move the index `i` backward by the length of the repetitive block @@ -155,7 +175,7 @@ def detect_repetitive_block_lengths(sequence: str, longest_ref_unit:RepeatUnit|N block=sequence[i - 1], block_size=1, repeat_unit=sequence[i - 1], - repeat_count=1 + repeat_count=1, ) result.append(ru) i -= 1 # Move back by one character if no match is found @@ -166,12 +186,12 @@ def detect_repetitive_block_lengths(sequence: str, longest_ref_unit:RepeatUnit|N matched = False for block_size in range(1, seq_len // 2 + 1): # Build the regex pattern for the current block size - pattern = rf'(.{{{block_size}}})\1+' + pattern = rf"(.{{{block_size}}})\1+" match = re.match(pattern, sequence[i:]) if match: repetitive_block = match.group() # The full repeating pattern - repeated_unit = match.group(1) # The repeating unit + repeated_unit = match.group(1) # The repeating unit repetition_count = len(repetitive_block) // len(repeated_unit) # Add the repetitive block and its details to the result @@ -179,7 +199,7 @@ def detect_repetitive_block_lengths(sequence: str, longest_ref_unit:RepeatUnit|N block=repetitive_block, block_size=len(repetitive_block), repeat_unit=repeated_unit, - repeat_count=repetition_count + repeat_count=repetition_count, ) result.append(ru) @@ -199,28 +219,43 @@ def get_repeat_str( other_seq: str, primary_repeat_unit: list[RepeatUnit], other_repeat_unit: list[RepeatUnit], - reverse: bool = False -) -> str: + reverse: bool = False, +) -> str | None: if len(primary_repeat_unit) == 0 and len(other_repeat_unit) == 0: return None - if len(primary_repeat_unit) == 0 and len(other_repeat_unit) == 1 and sequence == other_repeat_unit[0].repeat_unit: + if ( + len(primary_repeat_unit) == 0 + and len(other_repeat_unit) == 1 + and sequence == other_repeat_unit[0].repeat_unit + ): return f"{sequence}[1]" - elif len(primary_repeat_unit) == 0 and len(other_repeat_unit) == 1 and sequence != other_repeat_unit[0].repeat_unit: + elif ( + len(primary_repeat_unit) == 0 + and len(other_repeat_unit) == 1 + and sequence != other_repeat_unit[0].repeat_unit + ): return None if len(primary_repeat_unit) > 0 and len(other_repeat_unit) > 0: - return_str = assemble_repeat_string(sequence, primary_repeat_unit, reverse=reverse) + return_str = assemble_repeat_string( + sequence, primary_repeat_unit, reverse=reverse + ) return return_str if len(other_repeat_unit) == 0 and len(other_seq) > 0: - return_str = assemble_repeat_string(sequence, primary_repeat_unit, reverse=reverse) + return_str = assemble_repeat_string( + sequence, primary_repeat_unit, reverse=reverse + ) if len(return_str) > 0: return return_str return None -def assemble_repeat_string(sequence: str, repeat_units: list[RepeatUnit], reverse: bool = False) -> str: + +def assemble_repeat_string( + sequence: str, repeat_units: list[RepeatUnit], reverse: bool = False +) -> str: return_str = "" primary_repeat_unit = repeat_units.copy() seq = sequence @@ -230,7 +265,7 @@ def assemble_repeat_string(sequence: str, repeat_units: list[RepeatUnit], revers for ru in primary_repeat_unit: if seq.endswith(ru.block): return_str = f"{ru.repeat_unit}[{ru.repeat_count}]" + return_str - seq = seq[:-len(ru.block)] + seq = seq[: -len(ru.block)] found_unit = ru break if not found_unit: @@ -246,13 +281,13 @@ def assemble_repeat_string(sequence: str, repeat_units: list[RepeatUnit], revers return_str = f"{seq_char}[{count}]" + return_str - else: # forward direction + else: # forward direction while len(seq) > 0: found_unit = None for ru in primary_repeat_unit: if seq.startswith(ru.block): return_str += f"{ru.repeat_unit}[{ru.repeat_count}]" - seq = seq[len(ru.block):] + seq = seq[len(ru.block) :] found_unit = ru break if not found_unit: @@ -261,13 +296,11 @@ def assemble_repeat_string(sequence: str, repeat_units: list[RepeatUnit], revers seq_char = seq[0] seq = seq[1:] - # count consecutive repeating chars + # count consecutive repeating chars while seq and seq[0] == seq_char: count += 1 seq = seq[1:] - + return_str += f"{seq_char}[{count}]" - return return_str - diff --git a/src/hgvs/sequencevariant.py b/src/hgvs/sequencevariant.py index 8150bf49..4fcf4826 100644 --- a/src/hgvs/sequencevariant.py +++ b/src/hgvs/sequencevariant.py @@ -1,11 +1,13 @@ # -*- coding: utf-8 -*- -""" represents simple sequence-based variants """ +"""represents simple sequence-based variants""" import attr import hgvs.variantmapper from hgvs.enums import ValidationLevel from hgvs.utils.validation import validate_type_ac_pair +from hgvs.posedit import PosEdit +from hgvs.config import Config @attr.s(slots=True, repr=False) @@ -16,12 +18,12 @@ class SequenceVariant: or an hgvs.location.CDSInterval (for example) are both intended uses """ - ac = attr.ib() - type = attr.ib() - posedit = attr.ib() - gene = attr.ib(default=None) + ac: str = attr.ib() + type: str = attr.ib() + posedit: PosEdit = attr.ib() + gene: str | None = attr.ib(default=None) - def format(self, conf=None): + def format(self, conf: Config | None = None) -> str: """Formatting the stringification of sequence variants :param conf: a dict comprises formatting options. None is to use global settings. @@ -47,13 +49,21 @@ def format(self, conf=None): __str__ = format - def __repr__(self): + def __repr__(self) -> str: return "{0}({1})".format( self.__class__.__name__, - ", ".join((a.name + "=" + str(getattr(self, a.name))) for a in self.__attrs_attrs__), + ", ".join( + (a.name + "=" + str(getattr(self, a.name))) + for a in self.__attrs_attrs__ + ), ) - def fill_ref(self, hdp, alt_ac=None, alt_aln_method=hgvs.global_config.mapping.alt_aln_method): + def fill_ref( + self, + hdp: hgvs.variantmapper.VariantMapper, + alt_ac=None, + alt_aln_method=hgvs.global_config.mapping.alt_aln_method, + ) -> "SequenceVariant": # TODO: Refactor. SVs should not operate on themselves when # external resources are required # replace_reference should be moved outside function @@ -72,7 +82,7 @@ def fill_ref(self, hdp, alt_ac=None, alt_aln_method=hgvs.global_config.mapping.a self.posedit.edit.alt = self.posedit.edit.ref return self - def validate(self): + def validate(self) -> tuple[str, str | None]: (res, msg) = (ValidationLevel.VALID, None) if self.ac and self.type: (res, msg) = validate_type_ac_pair(self.type, self.ac) diff --git a/src/hgvs/transcriptmapper.py b/src/hgvs/transcriptmapper.py index 122c3de3..efc29b44 100644 --- a/src/hgvs/transcriptmapper.py +++ b/src/hgvs/transcriptmapper.py @@ -32,7 +32,13 @@ class TranscriptMapper: """ - def __init__(self, hdp, tx_ac, alt_ac, alt_aln_method): + def __init__( + self, + hdp: hgvs.dataproviders.interface.Interface, + tx_ac: str, + alt_ac: str, + alt_aln_method: str, + ) -> None: self.hdp = hdp self.tx_ac = tx_ac self.alt_ac = alt_ac @@ -46,7 +52,9 @@ def __init__(self, hdp, tx_ac, alt_ac, alt_aln_method): "No transcript info".format(self=self) ) - self.tx_exons = hdp.get_tx_exons(self.tx_ac, self.alt_ac, self.alt_aln_method) + self.tx_exons = hdp.get_tx_exons( + self.tx_ac, self.alt_ac, self.alt_aln_method + ) if self.tx_exons is None: raise HGVSDataNotAvailableError( "TranscriptMapper(tx_ac={self.tx_ac}, " @@ -62,7 +70,9 @@ def __init__(self, hdp, tx_ac, alt_ac, alt_aln_method): raise HGVSDataNotAvailableError( "TranscriptMapper(tx_ac={self.tx_ac}, " "alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): " - "Exons {a} and {b} are not adjacent".format(self=self, a=i, b=i + 1) + "Exons {a} and {b} are not adjacent".format( + self=self, a=i, b=i + 1 + ) ) self.strand = self.tx_exons[0]["alt_strand"] @@ -85,28 +95,34 @@ def __init__(self, hdp, tx_ac, alt_ac, alt_aln_method): self.cds_end_i = self.tx_identity_info["cds_end_i"] self.tgt_len = sum(self.tx_identity_info["lengths"]) - assert not ( - (self.cds_start_i is None) ^ (self.cds_end_i is None) - ), "CDS start and end must both be defined or neither defined" + assert not ((self.cds_start_i is None) ^ (self.cds_end_i is None)), ( + "CDS start and end must both be defined or neither defined" + ) - def __str__(self): + def __str__(self) -> str: return ( "{self.__class__.__name__}: {self.tx_ac} ~ {self.alt_ac} ~ {self.alt_aln_method}; " "{strand_pm} strand; {n_exons} exons; offset={self.gc_offset}".format( - self=self, n_exons=len(self.tx_exons), strand_pm=strand_int_to_pm(self.strand) + self=self, + n_exons=len(self.tx_exons), + strand_pm=strand_int_to_pm(self.strand), ) ) @property - def is_coding_transcript(self): - if (self.tx_info["cds_start_i"] is not None) ^ (self.tx_info["cds_end_i"] is not None): + def is_coding_transcript(self) -> bool: + if (self.tx_info["cds_start_i"] is not None) ^ ( + self.tx_info["cds_end_i"] is not None + ): raise HGVSError( "{self.tx_ac}: CDS start_i and end_i" " must be both defined or both undefined".format(self=self) ) return self.tx_info["cds_start_i"] is not None - def g_to_n(self, g_interval): + def g_to_n( + self, g_interval: hgvs.location.BaseOffsetInterval + ) -> hgvs.location.BaseOffsetInterval: """convert a genomic (g.) interval to a transcript cDNA (n.) interval""" # This code is extremely convoluted. To begin with, it @@ -153,7 +169,10 @@ def map_g_to_n_pos(pos): start_offset = _hgvs_offset(g_ci[0], grs, gre, self.strand) end_offset = _hgvs_offset(g_ci[1], grs, gre, self.strand) if self.strand == -1: - start_offset, end_offset = self.strand * end_offset, self.strand * start_offset + start_offset, end_offset = ( + self.strand * end_offset, + self.strand * start_offset, + ) if start_offset > 0: frs -= 1 if end_offset < 0: @@ -181,10 +200,14 @@ def map_g_to_n_pos(pos): uncertain=g_interval.uncertain, ) - def n_to_g(self, n_interval): + def n_to_g( + self, n_interval: hgvs.location.BaseOffsetInterval + ) -> hgvs.location.BaseOffsetInterval: """convert a transcript cDNA (n.) interval to a genomic (g.) interval""" - assert self.strand in [1, -1], "strand = " + str(self.strand) + "; must be 1 or -1" + assert self.strand in [1, -1], ( + "strand = " + str(self.strand) + "; must be 1 or -1" + ) if self.strand == 1: frs, fre = _hgvs_coord_to_ci(n_interval.start.base, n_interval.end.base) @@ -211,7 +234,9 @@ def n_to_g(self, n_interval): uncertain=n_interval.uncertain, ) - def n_to_c(self, n_interval): + def n_to_c( + self, n_interval: hgvs.location.BaseOffsetInterval + ) -> hgvs.location.BaseOffsetInterval: """convert a transcript cDNA (n.) interval to a transcript CDS (c.) interval""" if ( @@ -223,13 +248,18 @@ def n_to_c(self, n_interval): ) ) if n_interval.start.base <= 0 or n_interval.end.base > self.tgt_len: - raise HGVSError("The given coordinate is outside the bounds of the reference sequence.") + raise HGVSError( + "The given coordinate is outside the bounds of the reference sequence." + ) # start if n_interval.start.base <= self.cds_start_i: cs = n_interval.start.base - (self.cds_start_i + 1) cs_datum = Datum.CDS_START - elif n_interval.start.base > self.cds_start_i and n_interval.start.base <= self.cds_end_i: + elif ( + n_interval.start.base > self.cds_start_i + and n_interval.start.base <= self.cds_end_i + ): cs = n_interval.start.base - self.cds_start_i cs_datum = Datum.CDS_START else: @@ -239,7 +269,10 @@ def n_to_c(self, n_interval): if n_interval.end.base <= self.cds_start_i: ce = n_interval.end.base - (self.cds_start_i + 1) ce_datum = Datum.CDS_START - elif n_interval.end.base > self.cds_start_i and n_interval.end.base <= self.cds_end_i: + elif ( + n_interval.end.base > self.cds_start_i + and n_interval.end.base <= self.cds_end_i + ): ce = n_interval.end.base - self.cds_start_i ce_datum = Datum.CDS_START else: @@ -257,7 +290,9 @@ def n_to_c(self, n_interval): ) return c_interval - def c_to_n(self, c_interval): + def c_to_n( + self, c_interval: hgvs.location.BaseOffsetInterval + ) -> hgvs.location.BaseOffsetInterval: """convert a transcript CDS (c.) interval to a transcript cDNA (n.) interval""" if ( @@ -285,7 +320,9 @@ def c_to_n(self, c_interval): re = c_interval.end.base + self.cds_end_i if rs <= 0 or re > self.tgt_len: - raise HGVSError("The given coordinate is outside the bounds of the reference sequence.") + raise HGVSError( + "The given coordinate is outside the bounds of the reference sequence." + ) n_interval = hgvs.location.BaseOffsetInterval( start=hgvs.location.BaseOffsetPosition( @@ -298,16 +335,20 @@ def c_to_n(self, c_interval): ) return n_interval - def g_to_c(self, g_interval): + def g_to_c( + self, g_interval: hgvs.location.BaseOffsetInterval + ) -> hgvs.location.BaseOffsetInterval: """convert a genomic (g.) interval to a transcript CDS (c.) interval""" return self.n_to_c(self.g_to_n(g_interval)) - def c_to_g(self, c_interval): + def c_to_g( + self, c_interval: hgvs.location.BaseOffsetInterval + ) -> hgvs.location.BaseOffsetInterval: """convert a transcript CDS (c.) interval to a genomic (g.) interval""" return self.n_to_g(self.c_to_n(c_interval)) -def _ci_to_hgvs_coord(s, e): +def _ci_to_hgvs_coord(s: int, e: int) -> int | None: """Convert continuous interbase (right-open) coordinates (..,-2,-1,0,1,..) to discontinuous HGVS coordinates (..,-2,-1,1,2,..) """ @@ -315,10 +356,13 @@ def _ci_to_hgvs_coord(s, e): def _ci_to_hgvs(c): return c + 1 if c >= 0 else c - return (None if s is None else _ci_to_hgvs(s), None if e is None else _ci_to_hgvs(e) - 1) + return ( + None if s is None else _ci_to_hgvs(s), + None if e is None else _ci_to_hgvs(e) - 1, + ) -def _hgvs_coord_to_ci(s, e): +def _hgvs_coord_to_ci(s: int, e: int) -> int: """convert start,end interval in inclusive, discontinuous HGVS coordinates (..,-2,-1,1,2,..) to continuous interbase (right-open) coordinates (..,-2,-1,0,1,..)""" @@ -327,7 +371,10 @@ def _hgvs_to_ci(c): assert c != 0, "received CDS coordinate 0; expected ..,-2,-1,1,1,..." return c - 1 if c > 0 else c - return (None if s is None else _hgvs_to_ci(s), None if e is None else _hgvs_to_ci(e) + 1) + return ( + None if s is None else _hgvs_to_ci(s), + None if e is None else _hgvs_to_ci(e) + 1, + ) # diff --git a/src/hgvs/validator.py b/src/hgvs/validator.py index bfaefece..7e634064 100644 --- a/src/hgvs/validator.py +++ b/src/hgvs/validator.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- -"""implements validation of hgvs variants - -""" +"""implements validation of hgvs variants""" import logging @@ -12,9 +10,7 @@ from hgvs.enums import Datum, ValidationLevel from hgvs.exceptions import HGVSInvalidVariantError -SEQ_ERROR_MSG = ( - "Variant reference ({var_ref_seq}) does not agree with reference sequence ({ref_seq})" -) +SEQ_ERROR_MSG = "Variant reference ({var_ref_seq}) does not agree with reference sequence ({ref_seq})" CDS_BOUND_ERROR_MSG = "Variant is outside CDS bounds (CDS length : {cds_length})" TX_BOUND_ERROR_MSG = "Variant is outside the transcript bounds" @@ -28,12 +24,18 @@ class Validator: """invoke intrinsic and extrinsic validation""" - def __init__(self, hdp, strict=hgvs.global_config.validator.strict): + def __init__( + self, + hdp: hgvs.dataproviders.interface.Interface, + strict=hgvs.global_config.validator.strict, + ) -> None: self.strict = strict self._ivr = IntrinsicValidator(strict) self._evr = ExtrinsicValidator(hdp, strict) - def validate(self, var, strict=None): + def validate( + self, var: hgvs.sequencevariant.SequenceVariant, strict: bool = None + ) -> bool: if strict is None: strict = self.strict return self._ivr.validate(var, strict) and self._evr.validate(var, strict) @@ -45,13 +47,17 @@ class IntrinsicValidator: """ - def __init__(self, strict=hgvs.global_config.validator.strict): + def __init__(self, strict=hgvs.global_config.validator.strict) -> None: self.strict = strict - def validate(self, var, strict=None): - assert isinstance( - var, hgvs.sequencevariant.SequenceVariant - ), "variant must be a parsed HGVS sequence variant object" + def validate( + self, + var: hgvs.sequencevariant.SequenceVariant, + strict: bool = None, + ) -> bool: + assert isinstance(var, hgvs.sequencevariant.SequenceVariant), ( + "variant must be a parsed HGVS sequence variant object" + ) if strict is None: strict = self.strict fail_level = ValidationLevel.WARNING if strict else ValidationLevel.ERROR @@ -66,15 +72,23 @@ class ExtrinsicValidator: Attempts to determine if the HGVS name validates against external data sources """ - def __init__(self, hdp, strict=hgvs.global_config.validator.strict): + def __init__( + self, + hdp: hgvs.dataproviders.interface.Interface, + strict: bool = hgvs.global_config.validator.strict, + ) -> None: self.strict = strict self.hdp = hdp self.vm = hgvs.variantmapper.VariantMapper(self.hdp, prevalidation_level=None) - def validate(self, var, strict=None): - assert isinstance( - var, hgvs.sequencevariant.SequenceVariant - ), "variant must be a parsed HGVS sequence variant object" + def validate( + self, + var: bool = None, + strict: bool = hgvs.global_config.validator.strict, + ) -> bool: + assert isinstance(var, hgvs.sequencevariant.SequenceVariant), ( + "variant must be a parsed HGVS sequence variant object" + ) if strict is None: strict = self.strict fail_level = ValidationLevel.WARNING if strict else ValidationLevel.ERROR @@ -91,7 +105,8 @@ def validate(self, var, strict=None): if hgvs.global_config.mapping.strict_bounds: raise HGVSInvalidVariantError(msg) _logger.warning( - "{}: Variant outside transcript bounds;" " no validation provided".format(var) + "{}: Variant outside transcript bounds;" + " no validation provided".format(var) ) return True # no other checking performed @@ -105,7 +120,9 @@ def validate(self, var, strict=None): return True - def _ref_is_valid(self, var): + def _ref_is_valid( + self, var: hgvs.sequencevariant.SequenceVariant + ) -> tuple[str, str | None]: # use reference sequence of original variant, even if later converted (eg c_to_n) if ( var.type in BASE_OFFSET_COORD_TYPES @@ -147,7 +164,12 @@ def _ref_is_valid(self, var): var_ref_seq = getattr(var.posedit.edit, "ref", None) or None var_n = self.vm.c_to_n(var) if var.type == "c" else var ref_checks.append( - (var_n.ac, var_n.posedit.pos.start.base, var_n.posedit.pos.end.base, var_ref_seq) + ( + var_n.ac, + var_n.posedit.pos.start.base, + var_n.posedit.pos.end.base, + var_ref_seq, + ) ) for ac, var_ref_start, var_ref_end, var_ref_seq in ref_checks: @@ -172,7 +194,9 @@ def _ref_is_valid(self, var): return (ValidationLevel.VALID, None) - def _c_within_cds_bound(self, var): + def _c_within_cds_bound( + self, var: hgvs.sequencevariant.SequenceVariant + ) -> tuple[str, str | None]: if var.type != "c": return (ValidationLevel.VALID, None) tx_info = self.hdp.get_tx_identity_info(var.ac) @@ -186,12 +210,23 @@ def _c_within_cds_bound(self, var): var.posedit.pos.start.datum == Datum.CDS_START and var.posedit.pos.start.base > cds_length ): - return (ValidationLevel.ERROR, CDS_BOUND_ERROR_MSG.format(cds_length=cds_length)) - if var.posedit.pos.end.datum == Datum.CDS_START and var.posedit.pos.end.base > cds_length: - return (ValidationLevel.ERROR, CDS_BOUND_ERROR_MSG.format(cds_length=cds_length)) + return ( + ValidationLevel.ERROR, + CDS_BOUND_ERROR_MSG.format(cds_length=cds_length), + ) + if ( + var.posedit.pos.end.datum == Datum.CDS_START + and var.posedit.pos.end.base > cds_length + ): + return ( + ValidationLevel.ERROR, + CDS_BOUND_ERROR_MSG.format(cds_length=cds_length), + ) return (ValidationLevel.VALID, None) - def _n_within_transcript_bounds(self, var): + def _n_within_transcript_bounds( + self, var: hgvs.sequencevariant.SequenceVariant + ) -> tuple[str, str | None]: if var.type != "n": return (ValidationLevel.VALID, None) tx_info = self.hdp.get_tx_identity_info(var.ac) @@ -201,9 +236,15 @@ def _n_within_transcript_bounds(self, var): ValidationLevel.WARNING, "No transcript data for accession: {ac}".format(ac=var.ac), ) - if var.posedit.pos.start.datum == Datum.SEQ_START and var.posedit.pos.start.base <= 0: + if ( + var.posedit.pos.start.datum == Datum.SEQ_START + and var.posedit.pos.start.base <= 0 + ): return (ValidationLevel.ERROR, TX_BOUND_ERROR_MSG.format()) - if var.posedit.pos.end.datum == Datum.SEQ_START and var.posedit.pos.end.base > tx_len: + if ( + var.posedit.pos.end.datum == Datum.SEQ_START + and var.posedit.pos.end.base > tx_len + ): return (ValidationLevel.ERROR, TX_BOUND_ERROR_MSG.format()) return (ValidationLevel.VALID, None) diff --git a/src/hgvs/variantmapper.py b/src/hgvs/variantmapper.py index d514beaa..b0932e3d 100644 --- a/src/hgvs/variantmapper.py +++ b/src/hgvs/variantmapper.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- -"""Projects variants between sequences using AlignmentMapper. - -""" +"""Projects variants between sequences using AlignmentMapper.""" import copy import logging @@ -11,7 +9,7 @@ import hgvs import hgvs.alignmentmapper import hgvs.edit -import hgvs.location +import hgvs.intervalmapper import hgvs.normalizer import hgvs.posedit import hgvs.sequencevariant @@ -23,6 +21,11 @@ from hgvs.exceptions import HGVSInvalidVariantError, HGVSUnsupportedOperationError from hgvs.utils.reftranscriptdata import RefTranscriptData +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from hgvs.sequencevariant import SequenceVariant + _logger = logging.getLogger(__name__) @@ -67,11 +70,11 @@ class VariantMapper: def __init__( self, - hdp, + hdp: hgvs.dataproviders.interface.Interface, replace_reference=hgvs.global_config.mapping.replace_reference, prevalidation_level=hgvs.global_config.mapping.prevalidation_level, add_gene_symbol=hgvs.global_config.mapping.add_gene_symbol, - ): + ) -> None: """ :param bool replace_reference: replace reference (entails additional network access) :param str prevalidation_level: None or Intrinsic or Extrinsic validation before mapping @@ -96,9 +99,16 @@ def __init__( # ############################################################################ # g⟷t - def g_to_t(self, var_g, tx_ac, alt_aln_method=hgvs.global_config.mapping.alt_aln_method): + def g_to_t( + self, + var_g: "SequenceVariant", + tx_ac: str, + alt_aln_method=hgvs.global_config.mapping.alt_aln_method, + ) -> "SequenceVariant": if var_g.type not in "gm": - raise HGVSInvalidVariantError("Expected a g. or m. variant; got " + str(var_g)) + raise HGVSInvalidVariantError( + "Expected a g. or m. variant; got " + str(var_g) + ) if self._validator: self._validator.validate(var_g) var_g.fill_ref(self.hdp) @@ -115,9 +125,16 @@ def g_to_t(self, var_g, tx_ac, alt_aln_method=hgvs.global_config.mapping.alt_aln ) return var_out - def t_to_g(self, var_t, alt_ac, alt_aln_method=hgvs.global_config.mapping.alt_aln_method): + def t_to_g( + self, + var_t: "SequenceVariant", + alt_ac: str, + alt_aln_method: str = hgvs.global_config.mapping.alt_aln_method, + ) -> "SequenceVariant": if var_t.type not in "cn": - raise HGVSInvalidVariantError("Expected a c. or n. variant; got " + str(var_t)) + raise HGVSInvalidVariantError( + "Expected a c. or n. variant; got " + str(var_t) + ) if self._validator: self._validator.validate(var_t) var_t.fill_ref(self.hdp, alt_ac=alt_ac, alt_aln_method=alt_aln_method) @@ -133,7 +150,12 @@ def t_to_g(self, var_t, alt_ac, alt_aln_method=hgvs.global_config.mapping.alt_al # ############################################################################ # g⟷n - def g_to_n(self, var_g, tx_ac, alt_aln_method=hgvs.global_config.mapping.alt_aln_method): + def g_to_n( + self, + var_g: "SequenceVariant", + tx_ac: str, + alt_aln_method: str = hgvs.global_config.mapping.alt_aln_method, + ) -> "SequenceVariant": """Given a parsed g. variant, return a n. variant on the specified transcript using the specified alignment method (default is "splign" from NCBI). @@ -147,7 +169,9 @@ def g_to_n(self, var_g, tx_ac, alt_aln_method=hgvs.global_config.mapping.alt_aln """ if var_g.type not in "gm": - raise HGVSInvalidVariantError("Expected a g. or m. variant; got " + str(var_g)) + raise HGVSInvalidVariantError( + "Expected a g. or m. variant; got " + str(var_g) + ) if self._validator: self._validator.validate(var_g) mapper = self._fetch_AlignmentMapper( @@ -159,7 +183,9 @@ def g_to_n(self, var_g, tx_ac, alt_aln_method=hgvs.global_config.mapping.alt_aln and not hgvs.global_config.mapping.strict_bounds and not mapper.g_interval_is_inbounds(var_g.posedit.pos) ): - _logger.info("Renormalizing out-of-bounds minus strand variant on genomic sequence") + _logger.info( + "Renormalizing out-of-bounds minus strand variant on genomic sequence" + ) var_g = self.left_normalizer.normalize(var_g) var_g.fill_ref(self.hdp) @@ -195,7 +221,12 @@ def g_to_n(self, var_g, tx_ac, alt_aln_method=hgvs.global_config.mapping.alt_aln self._update_gene_symbol(var_n, var_g.gene) return var_n - def n_to_g(self, var_n, alt_ac, alt_aln_method=hgvs.global_config.mapping.alt_aln_method): + def n_to_g( + self, + var_n: "SequenceVariant", + alt_ac: str, + alt_aln_method: str = hgvs.global_config.mapping.alt_aln_method, + ) -> "SequenceVariant": """Given a parsed n. variant, return a g. variant on the specified transcript using the specified alignment method (default is "splign" from NCBI). @@ -240,7 +271,12 @@ def n_to_g(self, var_n, alt_ac, alt_aln_method=hgvs.global_config.mapping.alt_al # ############################################################################ # g⟷c - def g_to_c(self, var_g, tx_ac, alt_aln_method=hgvs.global_config.mapping.alt_aln_method): + def g_to_c( + self, + var_g: "SequenceVariant", + tx_ac: str, + alt_aln_method: str = hgvs.global_config.mapping.alt_aln_method, + ) -> "SequenceVariant": """Given a parsed g. variant, return a c. variant on the specified transcript using the specified alignment method (default is "splign" from NCBI). @@ -254,7 +290,9 @@ def g_to_c(self, var_g, tx_ac, alt_aln_method=hgvs.global_config.mapping.alt_aln """ if var_g.type not in "gm": - raise HGVSInvalidVariantError("Expected a g. or m. variant; got " + str(var_g)) + raise HGVSInvalidVariantError( + "Expected a g. or m. variant; got " + str(var_g) + ) if self._validator: self._validator.validate(var_g) var_g.fill_ref(self.hdp) @@ -290,7 +328,12 @@ def g_to_c(self, var_g, tx_ac, alt_aln_method=hgvs.global_config.mapping.alt_aln self._update_gene_symbol(var_c, var_g.gene) return var_c - def c_to_g(self, var_c, alt_ac, alt_aln_method=hgvs.global_config.mapping.alt_aln_method): + def c_to_g( + self, + var_c: "SequenceVariant", + alt_ac: str, + alt_aln_method: str = hgvs.global_config.mapping.alt_aln_method, + ) -> "SequenceVariant": """Given a parsed c. variant, return a g. variant on the specified transcript using the specified alignment method (default is "splign" from NCBI). @@ -337,7 +380,12 @@ def c_to_g(self, var_c, alt_ac, alt_aln_method=hgvs.global_config.mapping.alt_al # ############################################################################ # c⟷n - def c_to_n(self, var_c, alt_ac=None, alt_aln_method=hgvs.global_config.mapping.alt_aln_method): + def c_to_n( + self, + var_c: "SequenceVariant", + alt_ac: str | None = None, + alt_aln_method: str = hgvs.global_config.mapping.alt_aln_method, + ) -> "SequenceVariant": """Given a parsed c. variant, return a n. variant on the specified transcript using the specified alignment method (default is "transcript" indicating a self alignment). @@ -376,7 +424,12 @@ def c_to_n(self, var_c, alt_ac=None, alt_aln_method=hgvs.global_config.mapping.a self._update_gene_symbol(var_n, var_c.gene) return var_n - def n_to_c(self, var_n, alt_ac=None, alt_aln_method=hgvs.global_config.mapping.alt_aln_method): + def n_to_c( + self, + var_n: "SequenceVariant", + alt_ac: str | None = None, + alt_aln_method: str = hgvs.global_config.mapping.alt_aln_method, + ) -> "SequenceVariant": """Given a parsed n. variant, return a c. variant on the specified transcript using the specified alignment method (default is "transcript" indicating a self alignment). @@ -417,7 +470,14 @@ def n_to_c(self, var_n, alt_ac=None, alt_aln_method=hgvs.global_config.mapping.a # ############################################################################ # c ⟶ p - def c_to_p(self, var_c, pro_ac=None, alt_ac=None, alt_aln_method=hgvs.global_config.mapping.alt_aln_method, translation_table=TranslationTable.standard): + def c_to_p( + self, + var_c: "SequenceVariant", + pro_ac: str | None = None, + alt_ac: str | None = None, + alt_aln_method: str = hgvs.global_config.mapping.alt_aln_method, + translation_table=TranslationTable.standard, + ) -> "SequenceVariant": """ Converts a c. SequenceVariant to a p. SequenceVariant on the specified protein accession Author: Rudy Rico @@ -429,12 +489,18 @@ def c_to_p(self, var_c, pro_ac=None, alt_ac=None, alt_aln_method=hgvs.global_con """ if not (var_c.type == "c"): - raise HGVSInvalidVariantError("Expected a cDNA (c.) variant; got " + str(var_c)) + raise HGVSInvalidVariantError( + "Expected a cDNA (c.) variant; got " + str(var_c) + ) if self._validator: self._validator.validate(var_c) var_c.fill_ref(self.hdp, alt_ac=alt_ac, alt_aln_method=alt_aln_method) - reference_data = RefTranscriptData(self.hdp, var_c.ac, pro_ac, translation_table=translation_table) - builder = altseqbuilder.AltSeqBuilder(var_c, reference_data, translation_table=translation_table) + reference_data = RefTranscriptData( + self.hdp, var_c.ac, pro_ac, translation_table=translation_table + ) + builder = altseqbuilder.AltSeqBuilder( + var_c, reference_data, translation_table=translation_table + ) # TODO: handle case where you get 2+ alt sequences back; # currently get list of 1 element loop structure implemented @@ -457,11 +523,18 @@ def c_to_p(self, var_c, pro_ac=None, alt_ac=None, alt_aln_method=hgvs.global_con ############################################################################ # Internal methods - def _replace_reference(self, var, alt_ac=None, alt_aln_method=hgvs.global_config.mapping.alt_aln_method): + def _replace_reference( + self, + var: "SequenceVariant", + alt_ac: str | None = None, + alt_aln_method: str = hgvs.global_config.mapping.alt_aln_method, + ) -> "SequenceVariant": """fetch reference sequence for variant and update (in-place) if necessary""" if var.type not in "cgmnr": - raise HGVSUnsupportedOperationError("Can only update references for type c, g, m, n, r") + raise HGVSUnsupportedOperationError( + "Can only update references for type c, g, m, n, r" + ) if var.posedit.edit.type in ("ins", "con"): # these types have no reference sequence (zero-width), so return as-is @@ -474,10 +547,15 @@ def _replace_reference(self, var, alt_ac=None, alt_aln_method=hgvs.global_config and (var.posedit.pos.start.offset != 0 or var.posedit.pos.end.offset != 0) ): if var.type == "r": - _logger.info("Can't update reference sequence for intronic variant %s", var) + _logger.info( + "Can't update reference sequence for intronic variant %s", var + ) return var if alt_ac is None: - _logger.info("Can't update reference sequence for intronic variant %s without alt_ac", var) + _logger.info( + "Can't update reference sequence for intronic variant %s without alt_ac", + var, + ) return var if var.type == "c": mapper = self._fetch_AlignmentMapper( @@ -532,14 +610,18 @@ def _replace_reference(self, var, alt_ac=None, alt_aln_method=hgvs.global_config edit = var.posedit.edit if edit.ref != seq: _logger.debug( - "Replaced reference sequence in {var} with {seq}".format(var=var, seq=seq) + "Replaced reference sequence in {var} with {seq}".format( + var=var, seq=seq + ) ) edit.ref = seq return var @lru_cache(maxsize=hgvs.global_config.lru_cache.maxsize) - def _fetch_AlignmentMapper(self, tx_ac, alt_ac, alt_aln_method): + def _fetch_AlignmentMapper( + self, tx_ac: str, alt_ac: str, alt_aln_method: str + ) -> hgvs.alignmentmapper.AlignmentMapper: """ Get a new AlignmentMapper for the given transcript accession (ac), possibly caching the result. @@ -549,7 +631,9 @@ def _fetch_AlignmentMapper(self, tx_ac, alt_ac, alt_aln_method): ) @staticmethod - def _convert_edit_check_strand(strand, edit_in): + def _convert_edit_check_strand( + strand: int, edit_in: hgvs.edit.NARefAlt | hgvs.edit.Dup | hgvs.edit.Inv + ) -> hgvs.edit.NARefAlt | hgvs.edit.Dup | hgvs.edit.Inv: """ Convert an edit from one type to another, based on the stand and type """ @@ -584,10 +668,14 @@ def _convert_edit_check_strand(strand, edit_in): ref = reverse_complement(edit_in.ref) edit_out = hgvs.edit.Inv(ref=ref) else: - raise NotImplementedError("Only NARefAlt/Dup/Inv types are currently implemented") + raise NotImplementedError( + "Only NARefAlt/Dup/Inv types are currently implemented" + ) return edit_out - def _get_altered_sequence(self, strand, interval, var): + def _get_altered_sequence( + self, strand: int, interval: hgvs.intervalmapper.Interval, var + ) -> str: seq = list(self.hdp.get_seq(var.ac, interval.start.base - 1, interval.end.base)) # positions are 0-based and half-open pos_start = var.posedit.pos.start.base - interval.start.base @@ -606,12 +694,16 @@ def _get_altered_sequence(self, strand, interval, var): elif edit.type == "dup": seq.insert(pos_end, "".join(seq[pos_start:pos_end])) elif edit.type == "inv": - seq[pos_start:pos_end] = list(reverse_complement("".join(seq[pos_start:pos_end]))) + seq[pos_start:pos_end] = list( + reverse_complement("".join(seq[pos_start:pos_end])) + ) elif edit.type == "identity": pass else: raise HGVSUnsupportedOperationError( - "Getting altered sequence for {type} is unsupported".format(type=edit.type) + "Getting altered sequence for {type} is unsupported".format( + type=edit.type + ) ) seq = "".join(seq) @@ -619,7 +711,9 @@ def _get_altered_sequence(self, strand, interval, var): seq = reverse_complement(seq) return seq - def _update_gene_symbol(self, var, symbol): + def _update_gene_symbol( + self, var: "SequenceVariant", symbol: str + ) -> "SequenceVariant": if not symbol: symbol = self.hdp.get_tx_identity_info(var.ac).get("hgnc", None) var.gene = symbol