Skip to content
Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,4 @@ bigwig_loader/_version.py
/example_data/bigwig
/deleted
.pfau.yml
scratch/
6 changes: 0 additions & 6 deletions bigwig_loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@
__email__ = ["[email protected]"]

logger = logging.getLogger("bigwig_loader")
if not logger.hasHandlers():
handler = logging.StreamHandler()
formatter = logging.Formatter("[%(levelname)s] %(name)s: %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.WARNING)


logger.warning(
Expand Down
179 changes: 108 additions & 71 deletions bigwig_loader/bigwig.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import zlib
import logging
from pathlib import Path
from random import sample
from typing import BinaryIO
from typing import Iterable
from typing import Literal
from typing import Optional
Expand All @@ -22,13 +20,15 @@
from bigwig_loader.parser import ChromosomeTreeNode
from bigwig_loader.parser import RTreeIndexHeader
from bigwig_loader.parser import TotalSummary
from bigwig_loader.parser import WIGSectionHeader
from bigwig_loader.parser import ZoomHeader
from bigwig_loader.parser import collect_leaf_nodes
from bigwig_loader.store import BigWigStore
from bigwig_loader.subtract_intervals import subtract_interval_dataframe
from bigwig_loader.util import get_standard_chromosomes

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class BigWig:
def __init__(
Expand All @@ -50,6 +50,24 @@ def __init__(
self.id = id
self.scale = scale
self.bbi_header = BBIHeader.from_file(bigwig)

# Endianness check using BBI header magic
# UCSC BigWig spec: magic is 0x888FFC26
# Parser uses little-endian unpacking, so valid BigWig files will read as 0x888FFC26
# (the bytes in file are 26 FC 8F 88, which when read little-endian gives 0x888FFC26)
magic = self.bbi_header.magic
if magic == 0x888FFC26:
logger.debug(f"BBI header magic: {magic:#x} (valid BigWig file)")
elif magic == 0x26FC8F88:
# This would mean the file has bytes 88 8F FC 26, which is byte-swapped
logger.warning(
f"BBI header magic: {magic:#x} (byte-swapped BigWig - may need big-endian parsing)"
)
else:
logger.warning(
f"BBI header magic: {magic:#x} (unexpected value - not a valid BigWig file?)"
)

self.zoom_headers = ZoomHeader.all(
bigwig, first_offset=64, n_zoom_levels=self.bbi_header.zoom_levels
)
Expand All @@ -73,7 +91,7 @@ def __init__(
)

self.rtree_leaf_nodes = collect_leaf_nodes(file_object=bigwig, offset=None)
self.max_rows_per_chunk = self._guess_max_rows_per_chunk(bigwig)
self.max_rows_per_chunk = self.get_max_rows_per_chunk()

self.chrom_to_chrom_id: dict[str, int] = {
item.key: item.chrom_id for item in self.chromosome_head_node.items # type: ignore
Expand All @@ -90,13 +108,28 @@ def run_indexing(self, chromosome_offsets: npt.NDArray[np.int64]) -> None:
an index itself as well. But we prefer to recalculate
an index here.
"""

self.chromosome_offsets = chromosome_offsets
self.ncls_index, self.reference_data = self.build_ncls_index()
sorted_data = prepare_index_for_bigwig(
chromosome_offsets=chromosome_offsets,
rtree_leaf_nodes=self.rtree_leaf_nodes,
)
self.finalize_indexing(sorted_data)

def finalize_indexing(self, sorted_data: npt.NDArray[np.void]) -> None:
"""
Finalize indexing by building NCLS from prepared data.
This should be called in the main thread/process.
"""
self.reference_data = sorted_data
self.ncls_index = NCLS(
sorted_data["start_abs"],
sorted_data["end_abs"],
np.arange(len(sorted_data)),
)
self.store = BigWigStore(
self.path,
chunk_sizes=self.reference_data["data_size"], # type: ignore
chunk_offsets=self.reference_data["data_offset"], # type: ignore
chunk_sizes=sorted_data["data_size"],
chunk_offsets=sorted_data["data_offset"],
)

def chromosomes(
Expand Down Expand Up @@ -275,44 +308,6 @@ def get_batch_offsets_and_sizes_with_global_positions(
)
return self.store.get_offsets_and_sizes(np.sort(np.unique(right_index)))

def build_ncls_index(self) -> tuple[NCLS, npt.NDArray[np.void]]:
dtype = np.dtype(
[
("start_chrom_ix", "u4"),
("start_base", "u4"),
("end_chrom_ix", "u4"),
("end_base", "u4"),
("data_offset", "u8"),
("data_size", "u8"),
("start_abs", "i8"),
("end_abs", "i8"),
]
)

data = np.empty(self.rtree_leaf_nodes.shape, dtype=dtype)
data["start_chrom_ix"] = self.rtree_leaf_nodes["start_chrom_ix"] # type: ignore
data["start_base"] = self.rtree_leaf_nodes["start_base"] # type: ignore
data["end_chrom_ix"] = self.rtree_leaf_nodes["end_chrom_ix"] # type: ignore
data["end_base"] = self.rtree_leaf_nodes["end_base"] # type: ignore
data["data_offset"] = self.rtree_leaf_nodes["data_offset"] # type: ignore
data["data_size"] = self.rtree_leaf_nodes["data_size"] # type: ignore
data["start_abs"] = self.make_positions_global(
self.rtree_leaf_nodes["start_chrom_ix"], self.rtree_leaf_nodes["start_base"] # type: ignore
).astype(np.int64)
data["end_abs"] = self.make_positions_global(
self.rtree_leaf_nodes["end_chrom_ix"], self.rtree_leaf_nodes["end_base"] # type: ignore
).astype(np.int64)

sort_indices = np.argsort(data["start_abs"])
sorted_data = data[sort_indices]

ncls = NCLS(
sorted_data["start_abs"],
sorted_data["end_abs"],
np.arange(len(sorted_data)),
)
return ncls, sorted_data

def search_ncls(
self, query_df: pd.DataFrame, use_key: bool = False
) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64]]:
Expand Down Expand Up @@ -404,33 +399,75 @@ def _create_chrom_id_to_chrom_key(self) -> npt.NDArray[np.generic]:
mapping[chrom_id] = key
return mapping # type: ignore

def _guess_max_rows_per_chunk(
self, file_object: BinaryIO, sample_size: int = 100
) -> int:
def get_max_rows_per_chunk(self) -> int:
"""
Randomly samples some chunks and looks in the header
to see if in these chunks, the number of rows in the
decompressed data is always the same. This helps with
the ReferenceFileSystem.
Determines the maximum number of rows in any chunk.

Uses the BBI header's uncompress_buff_size as the authoritative source,
since this is the buffer size the file was created with and guarantees
all chunks can be decompressed into this space.

Args:
file_object: BigWig file opened as bytes.
sample_size: Number of chunks to sample for verification.
Returns:
Number of rows in a chunk
Maximum number of rows that could fit in the uncompressed buffer.
"""

rows_for_chunks = []

data_offsets = self.rtree_leaf_nodes["data_offset"]
data_sizes = self.rtree_leaf_nodes["data_size"]
if len(data_offsets) > sample_size:
sample_indices = sample(range(len(data_offsets)), sample_size) # nosec
data_offsets = data_offsets[sample_indices]
data_sizes = data_sizes[sample_indices]

for data_offset, data_size in zip(data_offsets, data_sizes):
file_object.seek(data_offset, 0) # type: ignore
decoded = zlib.decompress(file_object.read(data_size)) # type: ignore
header = WIGSectionHeader.from_bytes(decoded[:24])
rows_for_chunks.append(header.item_count)
return max(rows_for_chunks)
# The BBI header's uncompress_buff_size is the authoritative value
# It defines the maximum uncompressed size for any chunk in the file
# Each row is 12 bytes, plus 24-byte header
return (self.bbi_header.uncompress_buff_size - 24) // 12


def prepare_index_for_bigwig(
chromosome_offsets: npt.NDArray[np.int64],
rtree_leaf_nodes: npt.NDArray[np.void],
) -> npt.NDArray[np.generic]:
"""
Standalone function for preparing index data in a separate process.
This can be pickled and run with ProcessPoolExecutor.

Args:
path: Path to bigwig file (for identification)
bigwig_id: ID of the bigwig file
chromosome_offsets: Offsets for converting local to global positions
rtree_leaf_nodes: The rtree leaf nodes from the bigwig file

Returns:
sorted data
"""
dtype = np.dtype(
[
("start_chrom_ix", "u4"),
("start_base", "u4"),
("end_chrom_ix", "u4"),
("end_base", "u4"),
("data_offset", "u8"),
("data_size", "u8"),
("start_abs", "i8"),
("end_abs", "i8"),
]
)

data = np.empty(rtree_leaf_nodes.shape, dtype=dtype)
data["start_chrom_ix"] = rtree_leaf_nodes["start_chrom_ix"]
data["start_base"] = rtree_leaf_nodes["start_base"]
data["end_chrom_ix"] = rtree_leaf_nodes["end_chrom_ix"]
data["end_base"] = rtree_leaf_nodes["end_base"]
data["data_offset"] = rtree_leaf_nodes["data_offset"]
data["data_size"] = rtree_leaf_nodes["data_size"]

# make_positions_global inline
data["start_abs"] = (
rtree_leaf_nodes["start_base"]
+ chromosome_offsets[rtree_leaf_nodes["start_chrom_ix"]]
).astype(np.int64)
data["end_abs"] = (
rtree_leaf_nodes["end_base"]
+ chromosome_offsets[rtree_leaf_nodes["end_chrom_ix"]]
).astype(np.int64)

sort_indices = np.argsort(data["start_abs"])
sorted_data = data[sort_indices]

return sorted_data
68 changes: 55 additions & 13 deletions bigwig_loader/collection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import logging
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from functools import cached_property
from pathlib import Path
from typing import Any
Expand All @@ -14,6 +17,7 @@

from bigwig_loader.batch_processor import BatchProcessor
from bigwig_loader.bigwig import BigWig
from bigwig_loader.bigwig import prepare_index_for_bigwig
from bigwig_loader.decompressor import Decoder
from bigwig_loader.memory_bank import MemoryBank
from bigwig_loader.merge_intervals import merge_interval_dataframe
Expand All @@ -22,6 +26,8 @@
from bigwig_loader.subtract_intervals import subtract_interval_dataframe
from bigwig_loader.util import chromosome_sort

logger = logging.getLogger(__name__)


class BigWigCollection:
"""
Expand All @@ -47,7 +53,7 @@ def __init__(
bigwig_path: Union[str, Sequence[str], Path, Sequence[Path]],
file_extensions: Sequence[str] = (".bigWig", ".bw"),
crawl: bool = True,
scale: Optional[dict[Union[str | Path], Any]] = None,
scale: Optional[dict[Union[str, Path], Any]] = None,
first_n_files: Optional[int] = None,
pinned_memory_size: int = 10000,
):
Expand Down Expand Up @@ -81,11 +87,49 @@ def __init__(
self.run_indexing()
self._batch_processor = None

def run_indexing(self) -> None:
for bigwig in self.bigwigs:
bigwig.run_indexing(
chromosome_offsets=self.local_chrom_ids_to_offset_matrix[bigwig.id]
)
def run_indexing(self, max_workers: Optional[int] = None) -> None:
"""
Run indexing for all bigwig files in parallel using multiple CPUs.
"""
n_bigwigs = len(self.bigwigs)
if n_bigwigs == 0:
return

if n_bigwigs <= 4 or max_workers == 1:
for bigwig in self.bigwigs:
bigwig.run_indexing(
chromosome_offsets=self.local_chrom_ids_to_offset_matrix[bigwig.id]
)
return

if max_workers is None:
max_workers = min(os.cpu_count() or 4, n_bigwigs)

verbose = True # logger.isEnabledFor(logging.INFO)

with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
prepare_index_for_bigwig,
self.local_chrom_ids_to_offset_matrix[bigwig.id],
bigwig.rtree_leaf_nodes,
): bigwig
for bigwig in self.bigwigs
}

for i, future in enumerate(as_completed(futures), 1):
bigwig = futures[future]
sorted_data = future.result()
bigwig.chromosome_offsets = self.local_chrom_ids_to_offset_matrix[
bigwig.id
]
bigwig.finalize_indexing(sorted_data)

if verbose and (i % 100 == 0 or i == n_bigwigs):
print(f"\rIndexed {i}/{n_bigwigs}", end="", flush=True)

if verbose:
print()

def __len__(self) -> int:
return len(self.bigwigs)
Expand Down Expand Up @@ -217,13 +261,11 @@ def intervals(
Returns: pandas dataframe of intervals (chrom, start, end, value)

"""
logging.info("Collecting intervals from BigWig files.")
logger.info("Collecting intervals from BigWig files.")
interval_for_all_bigwigs = []
n_bigwigs = len(self.bigwigs)
for i, bw in enumerate(self.bigwigs):
logging.info(
f"Getting intervals for BigWig file {i}/{n_bigwigs}: {bw.path}"
)
logger.info(f"Getting intervals for BigWig file {i}/{n_bigwigs}: {bw.path}")
interval_for_all_bigwigs.append(
bw.intervals(
include_chromosomes=include_chromosomes,
Expand All @@ -244,14 +286,14 @@ def intervals(
intervals["start"] = intervals["start"].values.clip(padding) - padding # type: ignore
intervals["end"] = intervals["end"].values + padding # type: ignore
if merge:
logging.info(
logger.info(
"Merging intervals from different BigWig Files into one interval list."
)
intervals = merge_interval_dataframe(
intervals, is_sorted=False, allow_gap=merge_allow_gap
)
if blacklist is not None:
logging.info("Subtracting blacklisted regions.")
logger.info("Subtracting blacklisted regions.")
intervals = subtract_interval_dataframe(
intervals=intervals, blacklist=blacklist, buffer=blacklist_buffer
)
Expand Down Expand Up @@ -375,7 +417,7 @@ def create_global_position_system(
# exceeding this would be a problem as all the positions
# in bigwig files are encoded in 32 bits. Making a global
# base position that exceeds this number would be a problem.
logging.warning(
logger.warning(
"The sum of sizes of the chromosomes exceeds the size of uint32"
)
for bigwig in self.bigwigs:
Expand Down
Loading