diff --git a/pairtools/cli/__init__.py b/pairtools/cli/__init__.py index 9f42e44e..5d19b65e 100644 --- a/pairtools/cli/__init__.py +++ b/pairtools/cli/__init__.py @@ -202,4 +202,5 @@ def wrapper(*args, **kwargs): filterbycov, header, scaling, + coverage, ) diff --git a/pairtools/cli/coverage.py b/pairtools/cli/coverage.py new file mode 100644 index 00000000..acb2dc00 --- /dev/null +++ b/pairtools/cli/coverage.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Command-line interface for coverage analysis. + +This module provides a Click-based CLI that wraps the coverage_api module. +""" + +import click +import bioframe + +from ..lib.coverage import calculate_coverage, read_pairs +from . import cli, common_io_options + + +UTIL_NAME = "pairtools_coverage" + + +@cli.command() +@click.argument( + "pairs_path", + type=str, + required=False, +) +@click.option( + "--side", + "-s", + type=click.Choice(["both", "1", "2"]), + default="1", + show_default=True, + help="both: both sides, 1: first side, 2: second side", +) +@click.option( + "--end", + type=click.Choice(["5", "3", "0"]), + default="0", + show_default=True, + help="5', 3' end, or 0 (default) - whatever is reported as pos1 and pos2 in pairs", +) +@click.option( + "--shift", + type=int, + default=0, + show_default=True, + help="Shift value for strand-specific adjustment. Positive values shift downstream " + "relative to the corresponding strand, negative values shift upstream. " + "Can be useful to e.g. account for nucleosome size in micro-C by shifting by 73 bp", +) +@click.option( + "--window-size", + type=int, + default=10, + show_default=True, + help="Window size for binning", +) +@click.option( + "--chunksize", + type=int, + default=1000000, + show_default=True, + help="Number of pairs to process at once", +) +@click.option( + "--threads", + "-t", + type=int, + default=1, + show_default=True, + help="Number of threads", +) +@click.option( + "--chromsizes", + type=str, + default=None, + help="Chromosome order used to filter pairs: " + "path to a chromosomes file (e.g. UCSC chrom.sizes or similar) whose " + "first column lists scaffold names. If not provided, will be extracted " + "from the header of the input pairs file.", +) +@click.option( + "-o", + "--output", + type=str, + default="", + help="output file." + " If the path ends with .gz or .lz4, the output is bgzip-/lz4c-compressed." + " By default, the output is printed into stdout.", +) +@click.option( + "--output-bigwig", + type=str, + default=None, + help="Output BigWig file (optional)", +) +@common_io_options +def coverage( + pairs_path, + side, + end, + shift, + window_size, + chunksize, + threads, + chromsizes, + output, + output_bigwig, + **kwargs, +): + """Generate coverage from pairs file. + + PAIRS_PATH : input .pairs/.pairsam file. If the path ends with .gz or .lz4, the + input is decompressed by bgzip/lz4c. + By default, the input is read from stdin. + """ + coverage_py( + pairs_path=pairs_path, + output=output, + side=side, + end=end, + shift=shift, + window_size=window_size, + chunksize=chunksize, + threads=threads, + chromsizes=chromsizes, + output_bigwig=output_bigwig, + **kwargs, + ) + + +if __name__ == "__main__": + coverage() + + +def coverage_py( + pairs_path, + output, + side, + end, + shift, + window_size, + chunksize, + threads, + chromsizes, + output_bigwig, + **kwargs, +): + """Generate coverage from pairs file. + + PAIRS_PATH : input .pairs/.pairsam file. If the path ends with .gz or .lz4, the + input is decompressed by bgzip/lz4c. + By default, the input is read from stdin. + """ + + # Handle chromsizes - either from file or extract from pairs header + # Read pairs and get chromsizes from header + pairs_chunks, chromsizes_dict = read_pairs( + pairs_path if pairs_path else "-", chunksize, threads + ) + if chromsizes is not None: + chromsizes_dict = bioframe.read_chromsizes( + chromsizes, filter_chroms=False, natsort=False + ) + + # Calculate coverage using the API + coverage_df = calculate_coverage( + pairs_chunks=pairs_chunks, + chromsizes=chromsizes_dict, + side=side, + end=end, + shift=shift, + window_size=window_size, + threads=threads, + ) + + # Save the results + coverage_df.to_csv(output, sep="\t", index=False, header=False) + + if output_bigwig is not None: + bioframe.to_bigwig( + coverage_df[["chrom", "start", "end", "count"]], + chromsizes_dict, + output_bigwig, + ) diff --git a/pairtools/lib/__init__.py b/pairtools/lib/__init__.py index adce2962..a1d55faf 100644 --- a/pairtools/lib/__init__.py +++ b/pairtools/lib/__init__.py @@ -9,3 +9,4 @@ from . import restrict from . import stats from . import select +from . import coverage diff --git a/pairtools/lib/coverage.py b/pairtools/lib/coverage.py new file mode 100644 index 00000000..b08976d0 --- /dev/null +++ b/pairtools/lib/coverage.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Coverage analysis API for genomic pairs data. + +This module provides functions to calculate coverage from pairs files, +supporting various filtering and binning options. +""" + +import multiprocessing +from functools import partial +from typing import Iterator, Optional, Tuple, Union + +import bioframe +import numpy as np +import pandas as pd +from pairtools.lib import fileio, headerops + +pd.set_option("mode.copy_on_write", True) + + +def read_pairs( + pairs: Union[str, object], chunksize: int, threads: int = 1 +) -> Tuple[Iterator[pd.DataFrame], Union[dict, pd.Series]]: + """ + Read pairs file and extract chromosome sizes. + + Args: + pairs: Path to pairs file or file-like object + chunksize: Number of pairs to process at once + threads: Number of threads for reading + + Returns: + Tuple of (pairs DataFrame iterator, chromsizes dict) + """ + pairs_stream = ( + fileio.auto_open( + pairs, + mode="r", + nproc=threads, + ) + if isinstance(pairs, str) + else pairs + ) + + header, pairs_body = headerops.get_header(pairs_stream) + + cols = headerops.extract_column_names(header) + chromsizes = headerops.extract_chromsizes(header) + + pairs_df = pd.read_csv( + pairs_body, + header=None, + names=cols, + chunksize=chunksize, + sep="\t", + dtype={"chrom1": str, "chrom2": str}, + low_memory=False, + ) # type: ignore + return pairs_df, chromsizes + + +def intervals_to_increments(df: pd.DataFrame) -> pd.DataFrame: + """ + Convert genomic intervals to increment/decrement events. + + Args: + df: DataFrame with columns 'chrom', 'start', 'end' + + Returns: + DataFrame with columns 'chrom', 'pos', 'inc' + """ + inc_df = pd.concat( + [ + df[["chrom", "start"]].rename(columns={"start": "pos"}).eval("inc=1"), + df[["chrom", "end"]].rename(columns={"end": "pos"}).eval("inc=-1"), + ] + ) + return inc_df + + +def aggregate_increments(inc_df: pd.DataFrame) -> pd.DataFrame: + """ + Aggregate increment events by position. + + Args: + inc_df: DataFrame with increment events + + Returns: + Aggregated DataFrame sorted by chromosome and position + """ + inc_df = ( + inc_df.groupby(["chrom", "pos"]) + .sum() + .reset_index() + .sort_values(["chrom", "pos"], ignore_index=True) + ) + return inc_df + + +def coverage_chunk(df_chunk: pd.DataFrame, binsize: int) -> pd.DataFrame: + """ + Calculate coverage for a chunk of intervals. + + Args: + df_chunk: DataFrame with genomic intervals + binsize: Size of bins for aggregation + + Returns: + DataFrame with coverage counts per bin + """ + count_df = aggregate_increments(intervals_to_increments(df_chunk)) + + count_df.insert(2, "count", count_df["inc"].cumsum()) + count_df["bin"] = count_df["pos"] // binsize + if binsize > 1: + count_df = count_df.groupby(["chrom", "bin"], as_index=False).agg( + {"count": "sum"} + ) + count_df["bin"] = count_df["bin"].astype(int) + return count_df + + +def produce_chunks( + pairs_stream: Iterator[pd.DataFrame], side: str, end: str +) -> Iterator[pd.DataFrame]: + """ + Generate chunks of genomic intervals from pairs stream. + + Args: + pairs_stream: Iterator of pairs DataFrames + side: Which side to use (0=both, 1=first, 2=second) + end: Which end to use (0=default, 3=3', 5=5') + + Yields: + DataFrames with genomic intervals + """ + if end == "0": + end = "" + for pairs_df in pairs_stream: + if pairs_df.shape[0] == 0: + continue + if side != "both": + pairs_df["start"] = pairs_df[f"pos{end}{side}"] - 1 + pairs_df["end"] = pairs_df[f"pos{end}{side}"] + pairs_df["chrom"] = pairs_df[f"chrom{side}"] + pairs_df["strand"] = pairs_df[f"strand{side}"] + yield pairs_df[["chrom", "start", "end", "strand"]] + else: + for side in ["1", "2"]: + pairs_df["start"] = pairs_df[f"pos{end}{side}"] - 1 + pairs_df["end"] = pairs_df[f"pos{end}{side}"] + pairs_df["chrom"] = pairs_df[f"chrom{side}"] + pairs_df["strand"] = pairs_df[f"strand{side}"] + yield pairs_df[["chrom", "start", "end", "strand"]] + + +def process_chunk( + pairs_chunk: pd.DataFrame, + shift: int, + binsize: int, + chromsizes: Union[dict, pd.Series], +) -> pd.DataFrame: + """ + Process a single chunk of pairs data. + + Args: + pairs_chunk: DataFrame with pairs data + shift: Shift value for strand-specific adjustment + binsize: Bin size for coverage calculation + chromsizes: Dictionary of chromosome sizes + + Returns: + DataFrame with coverage data + """ + shift_values = np.where(pairs_chunk["strand"] == "+", shift, -shift) + pairs_chunk[["start", "end"]] = pairs_chunk[["start", "end"]].add( + shift_values, axis=0 + ) + pairs_chunk = pairs_chunk[ + (pairs_chunk["start"] >= 0) + & (pairs_chunk["end"] <= (pairs_chunk["chrom"].map(chromsizes))) + ] + pairs_chunk = pairs_chunk[["chrom", "start", "end"]] + pairs_chunk.sort_values(["chrom", "start", "end"], inplace=True) + pairs_chunk.reset_index(drop=True, inplace=True) + return coverage_chunk(pairs_chunk, binsize) + + +def calculate_coverage( + pairs_chunks: Iterator[pd.DataFrame], + chromsizes: Union[dict, pd.Series], + side: str = "1", + end: str = "0", + shift: int = 0, + window_size: int = 10, + threads: int = 1, +) -> pd.DataFrame: + """ + Calculate coverage from pairs chunks. + + Args: + pairs_chunks: Iterator of DataFrames with pairs data + chromsizes: Dictionary or pandas Series with chromosome sizes + side: Which side to use (0=both, 1=first, 2=second) + end: Which end to use (0=default, 3=3', 5=5') + shift: Shift value for strand-specific adjustment + window_size: Window size for binning + threads: Number of threads to use + + Returns: + DataFrame with coverage data + """ + chunks = produce_chunks(pairs_chunks, side, end) + func = partial( + process_chunk, shift=shift, binsize=window_size, chromsizes=chromsizes + ) + + if threads > 1: + with multiprocessing.Pool(threads) as pool: + coverage_chunks = pool.map(func, chunks) + else: + coverage_chunks = map(func, chunks) + + coverage_df = ( + pd.concat(coverage_chunks, ignore_index=True) + .groupby(["chrom", "bin"], as_index=False) + .agg({"count": "sum"}) + .reset_index(drop=True) + ) + coverage_df["start"] = (coverage_df["bin"] * window_size).astype(int) + coverage_df["end"] = coverage_df["start"] + window_size + coverage_df["end"] = np.where( + coverage_df["end"] <= coverage_df["chrom"].map(chromsizes), + coverage_df["end"], + coverage_df["chrom"].map(chromsizes), + ) + + # Filter out empty intervals + coverage_df = coverage_df[coverage_df["end"] - coverage_df["start"] > 0] + coverage_df = coverage_df[["chrom", "start", "end", "count"]] + + return coverage_df +