|
| 1 | +""" |
| 2 | +Coverage metrics extractor for OneRoof reporting. |
| 3 | +
|
| 4 | +Parses bedtools genomecov output (BED format with -bga flag) to compute |
| 5 | +coverage statistics for a sample. The output is a per-base BED file with |
| 6 | +columns: chrom, start, end, depth. |
| 7 | +
|
| 8 | +Example input (from `bedtools genomecov -bga`): |
| 9 | + MN908947.3 0 54 0 |
| 10 | + MN908947.3 54 100 15 |
| 11 | + MN908947.3 100 200 150 |
| 12 | + ... |
| 13 | +""" |
| 14 | + |
| 15 | +from pathlib import Path |
| 16 | + |
| 17 | +import polars as pl |
| 18 | +from pydantic import BaseModel, Field |
| 19 | + |
| 20 | + |
| 21 | +class CoverageMetrics(BaseModel): |
| 22 | + """Intermediate coverage metrics extracted from genomecov BED output.""" |
| 23 | + |
| 24 | + sample_id: str |
| 25 | + total_bases: int = Field(ge=0, description="Total bases in reference") |
| 26 | + mean_coverage: float = Field(ge=0, description="Weighted mean depth") |
| 27 | + median_coverage: float = Field(ge=0, description="Weighted median depth") |
| 28 | + genome_coverage_at_1x: float = Field( |
| 29 | + ge=0, le=1, description="Fraction of genome with ≥1x coverage" |
| 30 | + ) |
| 31 | + genome_coverage_at_10x: float = Field( |
| 32 | + ge=0, le=1, description="Fraction of genome with ≥10x coverage" |
| 33 | + ) |
| 34 | + genome_coverage_at_100x: float = Field( |
| 35 | + ge=0, le=1, description="Fraction of genome with ≥100x coverage" |
| 36 | + ) |
| 37 | + min_coverage: int = Field(ge=0, description="Minimum depth observed") |
| 38 | + max_coverage: int = Field(ge=0, description="Maximum depth observed") |
| 39 | + |
| 40 | + |
| 41 | +def load_coverage_bed(bed_path: Path) -> pl.DataFrame: |
| 42 | + """ |
| 43 | + Load a bedtools genomecov BED file. |
| 44 | +
|
| 45 | + Args: |
| 46 | + bed_path: Path to the per-base BED file from `bedtools genomecov -bga` |
| 47 | +
|
| 48 | + Returns: |
| 49 | + DataFrame with columns: chrom, start, end, depth |
| 50 | + """ |
| 51 | + return pl.read_csv( |
| 52 | + bed_path, |
| 53 | + separator="\t", |
| 54 | + has_header=False, |
| 55 | + new_columns=["chrom", "start", "end", "depth"], |
| 56 | + schema={ |
| 57 | + "chrom": pl.Utf8, |
| 58 | + "start": pl.Int64, |
| 59 | + "end": pl.Int64, |
| 60 | + "depth": pl.Int64, |
| 61 | + }, |
| 62 | + ) |
| 63 | + |
| 64 | + |
| 65 | +def compute_coverage_stats(df: pl.DataFrame) -> dict: |
| 66 | + """ |
| 67 | + Compute coverage statistics from a genomecov DataFrame. |
| 68 | +
|
| 69 | + Args: |
| 70 | + df: DataFrame with columns: chrom, start, end, depth |
| 71 | +
|
| 72 | + Returns: |
| 73 | + Dictionary with computed coverage statistics |
| 74 | + """ |
| 75 | + # Calculate region lengths |
| 76 | + df = df.with_columns((pl.col("end") - pl.col("start")).alias("length")) |
| 77 | + |
| 78 | + total_bases = df["length"].sum() |
| 79 | + |
| 80 | + if total_bases == 0: |
| 81 | + return { |
| 82 | + "total_bases": 0, |
| 83 | + "mean_coverage": 0.0, |
| 84 | + "median_coverage": 0.0, |
| 85 | + "genome_coverage_at_1x": 0.0, |
| 86 | + "genome_coverage_at_10x": 0.0, |
| 87 | + "genome_coverage_at_100x": 0.0, |
| 88 | + "min_coverage": 0, |
| 89 | + "max_coverage": 0, |
| 90 | + } |
| 91 | + |
| 92 | + # Weighted mean: sum(depth * length) / total_bases |
| 93 | + weighted_sum = (df["depth"] * df["length"]).sum() |
| 94 | + mean_coverage = weighted_sum / total_bases |
| 95 | + |
| 96 | + # Weighted median: expand depths by length and find median |
| 97 | + # For efficiency with large genomes, we compute this from the cumulative distribution |
| 98 | + sorted_df = df.sort("depth") |
| 99 | + sorted_df = sorted_df.with_columns( |
| 100 | + (pl.col("length").cum_sum() / total_bases).alias("cumulative_frac") |
| 101 | + ) |
| 102 | + # Find the first row where cumulative fraction >= 0.5 |
| 103 | + median_row = sorted_df.filter(pl.col("cumulative_frac") >= 0.5).head(1) |
| 104 | + median_coverage = float(median_row["depth"][0]) if len(median_row) > 0 else 0.0 |
| 105 | + |
| 106 | + # Coverage at thresholds |
| 107 | + bases_at_1x = df.filter(pl.col("depth") >= 1)["length"].sum() |
| 108 | + bases_at_10x = df.filter(pl.col("depth") >= 10)["length"].sum() |
| 109 | + bases_at_100x = df.filter(pl.col("depth") >= 100)["length"].sum() |
| 110 | + |
| 111 | + # Min and max coverage |
| 112 | + # Note: Polars min/max return PythonLiteral which includes int at runtime |
| 113 | + min_cov_value = df["depth"].min() |
| 114 | + max_cov_value = df["depth"].max() |
| 115 | + min_coverage = 0 if min_cov_value is None else int(min_cov_value) # type: ignore[arg-type] |
| 116 | + max_coverage = 0 if max_cov_value is None else int(max_cov_value) # type: ignore[arg-type] |
| 117 | + |
| 118 | + return { |
| 119 | + "total_bases": int(total_bases), |
| 120 | + "mean_coverage": float(mean_coverage), |
| 121 | + "median_coverage": float(median_coverage), |
| 122 | + "genome_coverage_at_1x": float(bases_at_1x / total_bases), |
| 123 | + "genome_coverage_at_10x": float(bases_at_10x / total_bases), |
| 124 | + "genome_coverage_at_100x": float(bases_at_100x / total_bases), |
| 125 | + "min_coverage": min_coverage, |
| 126 | + "max_coverage": max_coverage, |
| 127 | + } |
| 128 | + |
| 129 | + |
| 130 | +def extract(sample_id: str, bed_path: Path) -> CoverageMetrics: |
| 131 | + """ |
| 132 | + Extract coverage metrics from a bedtools genomecov BED file. |
| 133 | +
|
| 134 | + Args: |
| 135 | + sample_id: Sample identifier |
| 136 | + bed_path: Path to the per-base BED file |
| 137 | +
|
| 138 | + Returns: |
| 139 | + Validated CoverageMetrics model |
| 140 | + """ |
| 141 | + df = load_coverage_bed(bed_path) |
| 142 | + stats = compute_coverage_stats(df) |
| 143 | + return CoverageMetrics(sample_id=sample_id, **stats) |
0 commit comments