|
| 1 | +"""Module for getting feature (gene/exon) overlap""" |
| 2 | +import re |
| 3 | +from pathlib import Path |
| 4 | +from typing import Dict, Optional |
| 5 | + |
| 6 | +import pandas as pd |
| 7 | +from ga4gh.core import ga4gh_identify |
| 8 | +from ga4gh.vrs import models |
| 9 | + |
| 10 | +from cool_seq_tool.data_sources import SeqRepoAccess |
| 11 | +from cool_seq_tool.paths import MANE_REFSEQ_GFF_PATH |
| 12 | +from cool_seq_tool.schemas import Assembly, CdsOverlap, ResidueMode |
| 13 | + |
| 14 | + |
| 15 | +# Pattern for chromosome |
| 16 | +CHR_PATTERN = r"X|Y|([1-9]|1[0-9]|2[0-2])" |
| 17 | + |
| 18 | + |
| 19 | +class FeatureOverlapError(Exception): |
| 20 | + """Custom exception for the Feature Overlap class""" |
| 21 | + |
| 22 | + |
| 23 | +class FeatureOverlap: |
| 24 | + """The class for getting feature overlap""" |
| 25 | + |
| 26 | + def __init__( |
| 27 | + self, |
| 28 | + seqrepo_access: SeqRepoAccess, |
| 29 | + mane_refseq_gff_path: Path = MANE_REFSEQ_GFF_PATH, |
| 30 | + ) -> None: |
| 31 | + """Initialize the FeatureOverlap class. Will load RefSeq data and store as df. |
| 32 | +
|
| 33 | + :param seqrepo_access: Client for accessing SeqRepo data |
| 34 | + :param mane_refseq_gff_path: Path to the MANE RefSeq GFF file |
| 35 | + """ |
| 36 | + self.seqrepo_access = seqrepo_access |
| 37 | + self.mane_refseq_gff_path = mane_refseq_gff_path |
| 38 | + self.df = self._load_mane_refseq_gff_data() |
| 39 | + |
| 40 | + def _load_mane_refseq_gff_data(self) -> pd.core.frame.DataFrame: |
| 41 | + """Load MANE RefSeq GFF data file into DataFrame. |
| 42 | +
|
| 43 | + :return: DataFrame containing MANE RefSeq GFF data for CDS. Columns include |
| 44 | + `type`, `chromosome` (chromosome without 'chr' prefix), `cds_start`, |
| 45 | + `cds_stop`, `info_name` (name of record), and `gene`. `cds_start` and |
| 46 | + `cds_stop` use inter-residue coordinates. |
| 47 | + """ |
| 48 | + df = pd.read_csv( |
| 49 | + self.mane_refseq_gff_path, |
| 50 | + sep="\t", |
| 51 | + header=None, |
| 52 | + skiprows=9, |
| 53 | + usecols=[0, 2, 3, 4, 8], |
| 54 | + ) |
| 55 | + df.columns = ["chromosome", "type", "cds_start", "cds_stop", "info"] |
| 56 | + |
| 57 | + # Restrict to only feature of interest: CDS (which has gene info) |
| 58 | + df = df[df["type"] == "CDS"].copy() |
| 59 | + |
| 60 | + # Get name from the info field |
| 61 | + df["info_name"] = df["info"].apply( |
| 62 | + lambda info: re.findall("Name=([^;]+)", info)[0] |
| 63 | + ) |
| 64 | + |
| 65 | + # Get gene from the info field |
| 66 | + df["gene"] = df["info"].apply(lambda info: re.findall("gene=([^;]+)", info)[0]) |
| 67 | + |
| 68 | + # Get chromosome names without prefix and without suffix for alternate |
| 69 | + # transcripts |
| 70 | + df["chromosome"] = df["chromosome"].apply( |
| 71 | + lambda chromosome: chromosome.strip("chr").split("_")[0] |
| 72 | + ) |
| 73 | + df["chromosome"] = df["chromosome"].astype(str) |
| 74 | + |
| 75 | + # Convert start and stop to ints |
| 76 | + df["cds_start"] = df["cds_start"].astype(int) |
| 77 | + df["cds_stop"] = df["cds_stop"].astype(int) |
| 78 | + |
| 79 | + # Convert to inter-residue coordinates |
| 80 | + df["cds_start"] -= 1 |
| 81 | + |
| 82 | + # Only retain certain columns |
| 83 | + df = df[["type", "chromosome", "cds_start", "cds_stop", "info_name", "gene"]] |
| 84 | + |
| 85 | + return df |
| 86 | + |
| 87 | + def _get_chr_from_alt_ac(self, identifier: str) -> str: |
| 88 | + """Get chromosome given genomic identifier |
| 89 | +
|
| 90 | + :param identifier: Genomic identifier on GRCh38 assembly |
| 91 | + :raises FeatureOverlapError: If unable to find associated GRCh38 chromosome |
| 92 | + :return: Chromosome. 1..22, X, Y. No 'chr' prefix. |
| 93 | + """ |
| 94 | + aliases, error_msg = self.seqrepo_access.translate_identifier( |
| 95 | + identifier, Assembly.GRCH38.value |
| 96 | + ) |
| 97 | + |
| 98 | + if error_msg: |
| 99 | + raise FeatureOverlapError(str(error_msg)) |
| 100 | + |
| 101 | + if not aliases: |
| 102 | + raise FeatureOverlapError( |
| 103 | + f"Unable to find {Assembly.GRCH38.value} aliases for: {identifier}" |
| 104 | + ) |
| 105 | + |
| 106 | + assembly_chr_pattern = rf"^{Assembly.GRCH38.value}:(?P<chromosome>{CHR_PATTERN})$" # noqa: E501 |
| 107 | + for a in aliases: |
| 108 | + chr_match = re.match(assembly_chr_pattern, a) |
| 109 | + if chr_match: |
| 110 | + break |
| 111 | + |
| 112 | + if not chr_match: |
| 113 | + raise FeatureOverlapError( |
| 114 | + f"Unable to find {Assembly.GRCH38.value} chromosome for: {identifier}" |
| 115 | + ) |
| 116 | + |
| 117 | + chr_groupdict = chr_match.groupdict() |
| 118 | + return chr_groupdict["chromosome"] |
| 119 | + |
| 120 | + def get_grch38_mane_gene_cds_overlap( |
| 121 | + self, |
| 122 | + start: int, |
| 123 | + end: int, |
| 124 | + chromosome: Optional[str] = None, |
| 125 | + identifier: Optional[str] = None, |
| 126 | + residue_mode: ResidueMode = ResidueMode.RESIDUE, |
| 127 | + ) -> Optional[Dict[str, CdsOverlap]]: |
| 128 | + """Given GRCh38 genomic data, find the overlapping MANE features (gene and cds). |
| 129 | + The genomic data is specified as a sequence location by `chromosome`, `start`, |
| 130 | + `end`. All CDS regions with which the input sequence location has nonzero base |
| 131 | + pair overlap will be returned. |
| 132 | +
|
| 133 | + :param start: GRCh38 start position |
| 134 | + :param end: GRCh38 end position |
| 135 | + :param chromosome: Chromosome. 1..22, X, or Y. If not provided, must provide |
| 136 | + `identifier`. If both `chromosome` and `identifier` are provided, |
| 137 | + `chromosome` will be used. |
| 138 | + :param identifier: Genomic identifier on GRCh38 assembly. If not provided, must |
| 139 | + provide `chromosome`. If both `chromosome` and `identifier` are provided, |
| 140 | + `chromosome` will be used. |
| 141 | + :param residue_mode: Residue mode for `start` and `end` |
| 142 | + :raise FeatureOverlapError: If missing required fields or unable to find |
| 143 | + associated ga4gh identifier |
| 144 | + :return: MANE feature (gene/cds) overlap data represented as a dict. The |
| 145 | + dictionary will be keyed by genes which overlap the input sequence location. |
| 146 | + Each gene contains a list of the overlapping CDS regions with the beginning |
| 147 | + and end of the input sequence location's overlap with each |
| 148 | + { |
| 149 | + gene: { |
| 150 | + 'cds': VRS Sequence Location |
| 151 | + 'overlap': VRS Sequence Location |
| 152 | + } |
| 153 | + } |
| 154 | + """ |
| 155 | + ga4gh_seq_id = None |
| 156 | + if chromosome: |
| 157 | + if not re.match(f"^{CHR_PATTERN}$", chromosome): |
| 158 | + raise FeatureOverlapError("`chromosome` must be 1, ..., 22, X, or Y") |
| 159 | + else: |
| 160 | + if identifier: |
| 161 | + chromosome = self._get_chr_from_alt_ac(identifier) |
| 162 | + if identifier.startswith("ga4gh:SQ."): |
| 163 | + ga4gh_seq_id = identifier |
| 164 | + else: |
| 165 | + raise FeatureOverlapError( |
| 166 | + "Must provide either `chromosome` or `identifier`" |
| 167 | + ) |
| 168 | + |
| 169 | + # Convert residue to inter-residue |
| 170 | + if residue_mode == ResidueMode.RESIDUE: |
| 171 | + start -= 1 |
| 172 | + |
| 173 | + # Get feature dataframe (df uses inter-residue) |
| 174 | + feature_df = self.df[ |
| 175 | + (self.df["chromosome"] == chromosome) |
| 176 | + & (self.df["cds_start"] <= end) # noqa: W503 |
| 177 | + & (self.df["cds_stop"] >= start) # noqa: W503 |
| 178 | + ].copy() |
| 179 | + |
| 180 | + if feature_df.empty: |
| 181 | + return None |
| 182 | + |
| 183 | + # Add overlap columns |
| 184 | + feature_df["overlap_start"] = feature_df["cds_start"].apply( |
| 185 | + lambda x: x if start <= x else start |
| 186 | + ) |
| 187 | + feature_df["overlap_stop"] = feature_df["cds_stop"].apply( |
| 188 | + lambda x: end if end <= x else x |
| 189 | + ) |
| 190 | + |
| 191 | + # Get ga4gh identifier for chromosome |
| 192 | + if not ga4gh_seq_id: |
| 193 | + grch38_chr = f"{Assembly.GRCH38.value}:{chromosome}" |
| 194 | + ga4gh_aliases, error_msg = self.seqrepo_access.translate_identifier( |
| 195 | + grch38_chr, "ga4gh" |
| 196 | + ) |
| 197 | + |
| 198 | + # Errors should never happen but catching just in case |
| 199 | + if error_msg: |
| 200 | + raise FeatureOverlapError(str(error_msg)) |
| 201 | + elif not ga4gh_aliases: |
| 202 | + raise FeatureOverlapError( |
| 203 | + f"Unable to find ga4gh identifier for: {grch38_chr}" |
| 204 | + ) |
| 205 | + |
| 206 | + ga4gh_seq_id = ga4gh_aliases[0] |
| 207 | + |
| 208 | + def _get_seq_loc(start_pos: int, stop_pos: int, ga4gh_sequence_id: str) -> Dict: |
| 209 | + """Get VRS Sequence Location represented as a dict |
| 210 | +
|
| 211 | + :param start_pos: Start position |
| 212 | + :param stop_pos: Stop position |
| 213 | + :param ga4gh_sequence_id: ga4gh sequence identifier |
| 214 | + :return: VRS Sequence Location represented as dictionary with the ga4gh ID |
| 215 | + included |
| 216 | + """ |
| 217 | + _sl = models.SequenceLocation( |
| 218 | + sequence_id=ga4gh_sequence_id, |
| 219 | + interval=models.SequenceInterval( |
| 220 | + start=models.Number(value=start_pos), |
| 221 | + end=models.Number(value=stop_pos), |
| 222 | + ), |
| 223 | + ) |
| 224 | + _sl._id = ga4gh_identify(_sl) |
| 225 | + return _sl.as_dict() |
| 226 | + |
| 227 | + resp = {} |
| 228 | + for gene, group in feature_df.groupby("gene"): |
| 229 | + _gene_overlap_data = [] |
| 230 | + |
| 231 | + for cds_row in group.itertuples(): |
| 232 | + _gene_overlap_data.append( |
| 233 | + CdsOverlap( |
| 234 | + cds=_get_seq_loc( |
| 235 | + cds_row.cds_start, cds_row.cds_stop, ga4gh_seq_id |
| 236 | + ), |
| 237 | + overlap=_get_seq_loc( |
| 238 | + cds_row.overlap_start, cds_row.overlap_stop, ga4gh_seq_id |
| 239 | + ), |
| 240 | + ).dict(by_alias=True) |
| 241 | + ) |
| 242 | + resp[gene] = _gene_overlap_data |
| 243 | + |
| 244 | + return resp |
0 commit comments