|
| 1 | +"""Module for getting feature (gene/exon) overlap""" |
| 2 | + |
| 3 | +import re |
| 4 | +from pathlib import Path |
| 5 | + |
| 6 | +import polars as pl |
| 7 | +from ga4gh.core import ga4gh_identify |
| 8 | +from ga4gh.vrs.models import SequenceLocation, SequenceReference |
| 9 | + |
| 10 | +from cool_seq_tool.handlers import SeqRepoAccess |
| 11 | +from cool_seq_tool.resources.data_files import DataFile, get_data_file |
| 12 | +from cool_seq_tool.schemas import Assembly, CdsOverlap, CoordinateType |
| 13 | + |
| 14 | +# Pattern for chromosome |
| 15 | +CHR_PATTERN = r"X|Y|([1-9]|1[0-9]|2[0-2])" |
| 16 | + |
| 17 | + |
| 18 | +class FeatureOverlapError(Exception): |
| 19 | + """Custom exception for the Feature Overlap class""" |
| 20 | + |
| 21 | + |
| 22 | +class FeatureOverlap: |
| 23 | + """The class for getting feature overlap""" |
| 24 | + |
| 25 | + def __init__( |
| 26 | + self, |
| 27 | + seqrepo_access: SeqRepoAccess, |
| 28 | + mane_refseq_genomic_path: Path | None = None, |
| 29 | + from_local: bool = False, |
| 30 | + ) -> None: |
| 31 | + """Initialize the FeatureOverlap class. Will load RefSeq data and store as df. |
| 32 | +
|
| 33 | + :param seqrepo_access: Client for accessing SeqRepo data |
| 34 | + :param mane_refseq_genomic_path: Path to MANE RefSeq Genomic GFF data |
| 35 | + :param from_local: if ``True``, don't check for or acquire latest version -- |
| 36 | + just provide most recent locally available file, if possible, and raise |
| 37 | + error otherwise |
| 38 | + """ |
| 39 | + if not mane_refseq_genomic_path: |
| 40 | + mane_refseq_genomic_path = get_data_file( |
| 41 | + DataFile.MANE_REFSEQ_GENOMIC, from_local |
| 42 | + ) |
| 43 | + self.seqrepo_access = seqrepo_access |
| 44 | + self.mane_refseq_genomic_path = mane_refseq_genomic_path |
| 45 | + self.df = self._load_mane_refseq_gff_data() |
| 46 | + |
| 47 | + def _load_mane_refseq_gff_data(self) -> pl.DataFrame: |
| 48 | + """Load MANE RefSeq GFF data file into DataFrame. |
| 49 | +
|
| 50 | + :return: DataFrame containing MANE RefSeq Genomic GFF data for CDS. Columns |
| 51 | + include `type`, `chromosome` (chromosome without 'chr' prefix), `cds_start`, |
| 52 | + `cds_stop`, `info_name` (name of record), and `gene`. `cds_start` and |
| 53 | + `cds_stop` use inter-residue coordinates. |
| 54 | + """ |
| 55 | + df = pl.read_csv( |
| 56 | + self.mane_refseq_genomic_path, |
| 57 | + separator="\t", |
| 58 | + has_header=False, |
| 59 | + skip_rows=9, |
| 60 | + columns=[0, 2, 3, 4, 8], |
| 61 | + ) |
| 62 | + df.columns = ["chromosome", "type", "cds_start", "cds_stop", "info"] |
| 63 | + |
| 64 | + # Restrict to only feature of interest: CDS (which has gene info) |
| 65 | + df = df.filter(pl.col("type") == "CDS") |
| 66 | + |
| 67 | + # Get name from the info field |
| 68 | + # Get gene from the info field |
| 69 | + # Get chromosome names without prefix and without suffix for alternate transcripts |
| 70 | + # Convert start and stop to ints |
| 71 | + # Convert to inter-residue coordinates |
| 72 | + # Only return certain columns |
| 73 | + return df.with_columns( |
| 74 | + (pl.col("info").str.extract(r"Name=([^;]+)", 1).alias("info_name")), |
| 75 | + (pl.col("info").str.extract(r"gene=([^;]+)", 1).alias("gene")), |
| 76 | + (pl.col("chromosome").str.extract(r"^chr?([^_]+)", 1).alias("chromosome")), |
| 77 | + (pl.col("cds_start").cast(pl.Int64) - 1).alias("cds_start"), |
| 78 | + (pl.col("cds_stop").cast(pl.Int64).alias("cds_stop")), |
| 79 | + ).select( |
| 80 | + [ |
| 81 | + pl.col("type"), |
| 82 | + pl.col("chromosome"), |
| 83 | + pl.col("cds_start"), |
| 84 | + pl.col("cds_stop"), |
| 85 | + pl.col("info_name"), |
| 86 | + pl.col("gene"), |
| 87 | + ] |
| 88 | + ) |
| 89 | + |
| 90 | + def _get_chr_from_alt_ac(self, identifier: str) -> str: |
| 91 | + """Get chromosome given genomic identifier |
| 92 | +
|
| 93 | + :param identifier: Genomic identifier on GRCh38 assembly |
| 94 | + :raises FeatureOverlapError: If unable to find associated GRCh38 chromosome |
| 95 | + :return: Chromosome. 1..22, X, Y. No 'chr' prefix. |
| 96 | + """ |
| 97 | + aliases, error_msg = self.seqrepo_access.translate_identifier( |
| 98 | + identifier, Assembly.GRCH38.value |
| 99 | + ) |
| 100 | + |
| 101 | + if error_msg: |
| 102 | + raise FeatureOverlapError(str(error_msg)) |
| 103 | + |
| 104 | + if not aliases: |
| 105 | + error_msg = ( |
| 106 | + f"Unable to find {Assembly.GRCH38.value} aliases for: {identifier}" |
| 107 | + ) |
| 108 | + raise FeatureOverlapError(error_msg) |
| 109 | + |
| 110 | + assembly_chr_pattern = ( |
| 111 | + rf"^{Assembly.GRCH38.value}:(?P<chromosome>{CHR_PATTERN})$" |
| 112 | + ) |
| 113 | + for a in aliases: |
| 114 | + chr_match = re.match(assembly_chr_pattern, a) |
| 115 | + if chr_match: |
| 116 | + break |
| 117 | + |
| 118 | + if not chr_match: |
| 119 | + error_msg = ( |
| 120 | + f"Unable to find {Assembly.GRCH38.value} chromosome for: {identifier}" |
| 121 | + ) |
| 122 | + raise FeatureOverlapError(error_msg) |
| 123 | + |
| 124 | + chr_groupdict = chr_match.groupdict() |
| 125 | + return chr_groupdict["chromosome"] |
| 126 | + |
| 127 | + def get_grch38_mane_gene_cds_overlap( |
| 128 | + self, |
| 129 | + start: int, |
| 130 | + end: int, |
| 131 | + chromosome: str | None = None, |
| 132 | + identifier: str | None = None, |
| 133 | + coordinate_type: CoordinateType = CoordinateType.RESIDUE, |
| 134 | + ) -> dict[str, list[CdsOverlap]] | None: |
| 135 | + """Given GRCh38 genomic data, find the overlapping MANE features (gene and cds). |
| 136 | + The genomic data is specified as a sequence location by `chromosome`, `start`, |
| 137 | + `end`. All CDS regions with which the input sequence location has nonzero base |
| 138 | + pair overlap will be returned. |
| 139 | +
|
| 140 | + :param start: GRCh38 start position |
| 141 | + :param end: GRCh38 end position |
| 142 | + :param chromosome: Chromosome. 1..22, X, or Y. If not provided, must provide |
| 143 | + `identifier`. If both `chromosome` and `identifier` are provided, |
| 144 | + `chromosome` will be used. |
| 145 | + :param identifier: Genomic identifier on GRCh38 assembly. If not provided, must |
| 146 | + provide `chromosome`. If both `chromosome` and `identifier` are provided, |
| 147 | + `chromosome` will be used. |
| 148 | + :param coordinate_type: Coordinate type for ``start`` and ``end`` |
| 149 | + :raise FeatureOverlapError: If missing required fields or unable to find |
| 150 | + associated ga4gh identifier |
| 151 | + :return: MANE feature (gene/cds) overlap data represented as a dict. The |
| 152 | + dictionary will be keyed by genes which overlap the input sequence location. |
| 153 | + Each gene contains a list of the overlapping CDS regions with the beginning |
| 154 | + and end of the input sequence location's overlap with each |
| 155 | + """ |
| 156 | + ga4gh_seq_id = None |
| 157 | + if chromosome: |
| 158 | + if not re.match(f"^{CHR_PATTERN}$", chromosome): |
| 159 | + error_msg = "`chromosome` must be 1, ..., 22, X, or Y" |
| 160 | + raise FeatureOverlapError(error_msg) |
| 161 | + else: |
| 162 | + if identifier: |
| 163 | + chromosome = self._get_chr_from_alt_ac(identifier) |
| 164 | + if identifier.startswith("ga4gh:SQ."): |
| 165 | + ga4gh_seq_id = identifier |
| 166 | + else: |
| 167 | + error_msg = "Must provide either `chromosome` or `identifier`" |
| 168 | + raise FeatureOverlapError(error_msg) |
| 169 | + |
| 170 | + # Convert residue to inter-residue |
| 171 | + if coordinate_type == CoordinateType.RESIDUE: |
| 172 | + start -= 1 |
| 173 | + |
| 174 | + # Get feature dataframe (df uses inter-residue) |
| 175 | + feature_df = self.df.filter( |
| 176 | + (pl.col("chromosome") == chromosome) |
| 177 | + & (pl.col("cds_start") <= end) |
| 178 | + & (pl.col("cds_stop") >= start) |
| 179 | + ) |
| 180 | + |
| 181 | + if feature_df.is_empty(): |
| 182 | + return None |
| 183 | + |
| 184 | + # Add overlap columns |
| 185 | + feature_df = feature_df.with_columns( |
| 186 | + [ |
| 187 | + pl.when(pl.col("cds_start") < start) |
| 188 | + .then(start) |
| 189 | + .otherwise(pl.col("cds_start")) |
| 190 | + .alias("overlap_start"), |
| 191 | + pl.when(pl.col("cds_stop") > end) |
| 192 | + .then(end) |
| 193 | + .otherwise(pl.col("cds_stop")) |
| 194 | + .alias("overlap_stop"), |
| 195 | + ] |
| 196 | + ) |
| 197 | + |
| 198 | + # Get ga4gh identifier for chromosome |
| 199 | + if not ga4gh_seq_id: |
| 200 | + grch38_chr = f"{Assembly.GRCH38.value}:{chromosome}" |
| 201 | + ga4gh_aliases, error_msg = self.seqrepo_access.translate_identifier( |
| 202 | + grch38_chr, "ga4gh" |
| 203 | + ) |
| 204 | + |
| 205 | + # Errors should never happen but catching just in case |
| 206 | + if error_msg: |
| 207 | + raise FeatureOverlapError(str(error_msg)) |
| 208 | + |
| 209 | + if not ga4gh_aliases: |
| 210 | + error_msg = f"Unable to find ga4gh identifier for: {grch38_chr}" |
| 211 | + raise FeatureOverlapError(error_msg) |
| 212 | + |
| 213 | + ga4gh_seq_id = ga4gh_aliases[0] |
| 214 | + |
| 215 | + def _get_seq_loc(start_pos: int, stop_pos: int, refget_ac: str) -> dict: |
| 216 | + """Get VRS Sequence Location represented as a dict |
| 217 | +
|
| 218 | + :param start_pos: Start position |
| 219 | + :param stop_pos: Stop position |
| 220 | + :param refget_ac: Refget Accession (SQ.) |
| 221 | + :return: VRS Sequence Location represented as dictionary with the ga4gh ID |
| 222 | + included |
| 223 | + """ |
| 224 | + _sl = SequenceLocation( |
| 225 | + sequenceReference=SequenceReference( |
| 226 | + refgetAccession=refget_ac, |
| 227 | + ), |
| 228 | + start=start_pos, |
| 229 | + end=stop_pos, |
| 230 | + ) |
| 231 | + ga4gh_identify(_sl) |
| 232 | + return _sl.model_dump(exclude_none=True) |
| 233 | + |
| 234 | + resp = {} |
| 235 | + refget_ac = ga4gh_seq_id.split("ga4gh:")[-1] |
| 236 | + for gene, group in feature_df.group_by("gene"): |
| 237 | + gene = gene[0] |
| 238 | + _gene_overlap_data = [ |
| 239 | + CdsOverlap( |
| 240 | + cds=_get_seq_loc( |
| 241 | + cds_row["cds_start"], cds_row["cds_stop"], refget_ac |
| 242 | + ), |
| 243 | + overlap=_get_seq_loc( |
| 244 | + cds_row["overlap_start"], cds_row["overlap_stop"], refget_ac |
| 245 | + ), |
| 246 | + ).model_dump(by_alias=True, exclude_none=True) |
| 247 | + for cds_row in group.iter_rows(named=True) |
| 248 | + ] |
| 249 | + resp[gene] = _gene_overlap_data |
| 250 | + |
| 251 | + return resp |
0 commit comments