Skip to content

Commit 4034752

Browse files
authored
Merge pull request #23 from pfizer-opensource/debug/cuda_errors_for_long_sequences
Debug/cuda errors for long sequences
2 parents b34da81 + 870fc35 commit 4034752

File tree

13 files changed

+2115
-392
lines changed

13 files changed

+2115
-392
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,4 @@ bigwig_loader/_version.py
132132
/example_data/bigwig
133133
/deleted
134134
.pfau.yml
135+
scratch/

bigwig_loader/__init__.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,6 @@
1414
__email__ = ["joren.retel@pfizer.com"]
1515

1616
logger = logging.getLogger("bigwig_loader")
17-
if not logger.hasHandlers():
18-
handler = logging.StreamHandler()
19-
formatter = logging.Formatter("[%(levelname)s] %(name)s: %(message)s")
20-
handler.setFormatter(formatter)
21-
logger.addHandler(handler)
22-
logger.setLevel(logging.WARNING)
2317

2418

2519
logger.warning(

bigwig_loader/bigwig.py

Lines changed: 108 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import zlib
1+
import logging
22
from pathlib import Path
3-
from random import sample
4-
from typing import BinaryIO
53
from typing import Iterable
64
from typing import Literal
75
from typing import Optional
@@ -22,13 +20,15 @@
2220
from bigwig_loader.parser import ChromosomeTreeNode
2321
from bigwig_loader.parser import RTreeIndexHeader
2422
from bigwig_loader.parser import TotalSummary
25-
from bigwig_loader.parser import WIGSectionHeader
2623
from bigwig_loader.parser import ZoomHeader
2724
from bigwig_loader.parser import collect_leaf_nodes
2825
from bigwig_loader.store import BigWigStore
2926
from bigwig_loader.subtract_intervals import subtract_interval_dataframe
3027
from bigwig_loader.util import get_standard_chromosomes
3128

29+
logger = logging.getLogger(__name__)
30+
logger.setLevel(logging.INFO)
31+
3232

3333
class BigWig:
3434
def __init__(
@@ -50,6 +50,24 @@ def __init__(
5050
self.id = id
5151
self.scale = scale
5252
self.bbi_header = BBIHeader.from_file(bigwig)
53+
54+
# Endianness check using BBI header magic
55+
# UCSC BigWig spec: magic is 0x888FFC26
56+
# Parser uses little-endian unpacking, so valid BigWig files will read as 0x888FFC26
57+
# (the bytes in file are 26 FC 8F 88, which when read little-endian gives 0x888FFC26)
58+
magic = self.bbi_header.magic
59+
if magic == 0x888FFC26:
60+
logger.debug(f"BBI header magic: {magic:#x} (valid BigWig file)")
61+
elif magic == 0x26FC8F88:
62+
# This would mean the file has bytes 88 8F FC 26, which is byte-swapped
63+
logger.warning(
64+
f"BBI header magic: {magic:#x} (byte-swapped BigWig - may need big-endian parsing)"
65+
)
66+
else:
67+
logger.warning(
68+
f"BBI header magic: {magic:#x} (unexpected value - not a valid BigWig file?)"
69+
)
70+
5371
self.zoom_headers = ZoomHeader.all(
5472
bigwig, first_offset=64, n_zoom_levels=self.bbi_header.zoom_levels
5573
)
@@ -73,7 +91,7 @@ def __init__(
7391
)
7492

7593
self.rtree_leaf_nodes = collect_leaf_nodes(file_object=bigwig, offset=None)
76-
self.max_rows_per_chunk = self._guess_max_rows_per_chunk(bigwig)
94+
self.max_rows_per_chunk = self.get_max_rows_per_chunk()
7795

7896
self.chrom_to_chrom_id: dict[str, int] = {
7997
item.key: item.chrom_id for item in self.chromosome_head_node.items # type: ignore
@@ -90,13 +108,28 @@ def run_indexing(self, chromosome_offsets: npt.NDArray[np.int64]) -> None:
90108
an index itself as well. But we prefer to recalculate
91109
an index here.
92110
"""
93-
94111
self.chromosome_offsets = chromosome_offsets
95-
self.ncls_index, self.reference_data = self.build_ncls_index()
112+
sorted_data = prepare_index_for_bigwig(
113+
chromosome_offsets=chromosome_offsets,
114+
rtree_leaf_nodes=self.rtree_leaf_nodes,
115+
)
116+
self.finalize_indexing(sorted_data)
117+
118+
def finalize_indexing(self, sorted_data: npt.NDArray[np.void]) -> None:
119+
"""
120+
Finalize indexing by building NCLS from prepared data.
121+
This should be called in the main thread/process.
122+
"""
123+
self.reference_data = sorted_data
124+
self.ncls_index = NCLS(
125+
sorted_data["start_abs"],
126+
sorted_data["end_abs"],
127+
np.arange(len(sorted_data)),
128+
)
96129
self.store = BigWigStore(
97130
self.path,
98-
chunk_sizes=self.reference_data["data_size"], # type: ignore
99-
chunk_offsets=self.reference_data["data_offset"], # type: ignore
131+
chunk_sizes=sorted_data["data_size"],
132+
chunk_offsets=sorted_data["data_offset"],
100133
)
101134

102135
def chromosomes(
@@ -275,44 +308,6 @@ def get_batch_offsets_and_sizes_with_global_positions(
275308
)
276309
return self.store.get_offsets_and_sizes(np.sort(np.unique(right_index)))
277310

278-
def build_ncls_index(self) -> tuple[NCLS, npt.NDArray[np.void]]:
279-
dtype = np.dtype(
280-
[
281-
("start_chrom_ix", "u4"),
282-
("start_base", "u4"),
283-
("end_chrom_ix", "u4"),
284-
("end_base", "u4"),
285-
("data_offset", "u8"),
286-
("data_size", "u8"),
287-
("start_abs", "i8"),
288-
("end_abs", "i8"),
289-
]
290-
)
291-
292-
data = np.empty(self.rtree_leaf_nodes.shape, dtype=dtype)
293-
data["start_chrom_ix"] = self.rtree_leaf_nodes["start_chrom_ix"] # type: ignore
294-
data["start_base"] = self.rtree_leaf_nodes["start_base"] # type: ignore
295-
data["end_chrom_ix"] = self.rtree_leaf_nodes["end_chrom_ix"] # type: ignore
296-
data["end_base"] = self.rtree_leaf_nodes["end_base"] # type: ignore
297-
data["data_offset"] = self.rtree_leaf_nodes["data_offset"] # type: ignore
298-
data["data_size"] = self.rtree_leaf_nodes["data_size"] # type: ignore
299-
data["start_abs"] = self.make_positions_global(
300-
self.rtree_leaf_nodes["start_chrom_ix"], self.rtree_leaf_nodes["start_base"] # type: ignore
301-
).astype(np.int64)
302-
data["end_abs"] = self.make_positions_global(
303-
self.rtree_leaf_nodes["end_chrom_ix"], self.rtree_leaf_nodes["end_base"] # type: ignore
304-
).astype(np.int64)
305-
306-
sort_indices = np.argsort(data["start_abs"])
307-
sorted_data = data[sort_indices]
308-
309-
ncls = NCLS(
310-
sorted_data["start_abs"],
311-
sorted_data["end_abs"],
312-
np.arange(len(sorted_data)),
313-
)
314-
return ncls, sorted_data
315-
316311
def search_ncls(
317312
self, query_df: pd.DataFrame, use_key: bool = False
318313
) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64]]:
@@ -404,33 +399,75 @@ def _create_chrom_id_to_chrom_key(self) -> npt.NDArray[np.generic]:
404399
mapping[chrom_id] = key
405400
return mapping # type: ignore
406401

407-
def _guess_max_rows_per_chunk(
408-
self, file_object: BinaryIO, sample_size: int = 100
409-
) -> int:
402+
def get_max_rows_per_chunk(self) -> int:
410403
"""
411-
Randomly samples some chunks and looks in the header
412-
to see if in these chunks, the number of rows in the
413-
decompressed data is always the same. This helps with
414-
the ReferenceFileSystem.
404+
Determines the maximum number of rows in any chunk.
405+
406+
Uses the BBI header's uncompress_buff_size as the authoritative source,
407+
since this is the buffer size the file was created with and guarantees
408+
all chunks can be decompressed into this space.
415409
416410
Args:
417411
file_object: BigWig file opened as bytes.
412+
sample_size: Number of chunks to sample for verification.
418413
Returns:
419-
Number of rows in a chunk
414+
Maximum number of rows that could fit in the uncompressed buffer.
420415
"""
421-
422-
rows_for_chunks = []
423-
424-
data_offsets = self.rtree_leaf_nodes["data_offset"]
425-
data_sizes = self.rtree_leaf_nodes["data_size"]
426-
if len(data_offsets) > sample_size:
427-
sample_indices = sample(range(len(data_offsets)), sample_size) # nosec
428-
data_offsets = data_offsets[sample_indices]
429-
data_sizes = data_sizes[sample_indices]
430-
431-
for data_offset, data_size in zip(data_offsets, data_sizes):
432-
file_object.seek(data_offset, 0) # type: ignore
433-
decoded = zlib.decompress(file_object.read(data_size)) # type: ignore
434-
header = WIGSectionHeader.from_bytes(decoded[:24])
435-
rows_for_chunks.append(header.item_count)
436-
return max(rows_for_chunks)
416+
# The BBI header's uncompress_buff_size is the authoritative value
417+
# It defines the maximum uncompressed size for any chunk in the file
418+
# Each row is 12 bytes, plus 24-byte header
419+
return (self.bbi_header.uncompress_buff_size - 24) // 12
420+
421+
422+
def prepare_index_for_bigwig(
423+
chromosome_offsets: npt.NDArray[np.int64],
424+
rtree_leaf_nodes: npt.NDArray[np.void],
425+
) -> npt.NDArray[np.generic]:
426+
"""
427+
Standalone function for preparing index data in a separate process.
428+
This can be pickled and run with ProcessPoolExecutor.
429+
430+
Args:
431+
path: Path to bigwig file (for identification)
432+
bigwig_id: ID of the bigwig file
433+
chromosome_offsets: Offsets for converting local to global positions
434+
rtree_leaf_nodes: The rtree leaf nodes from the bigwig file
435+
436+
Returns:
437+
sorted data
438+
"""
439+
dtype = np.dtype(
440+
[
441+
("start_chrom_ix", "u4"),
442+
("start_base", "u4"),
443+
("end_chrom_ix", "u4"),
444+
("end_base", "u4"),
445+
("data_offset", "u8"),
446+
("data_size", "u8"),
447+
("start_abs", "i8"),
448+
("end_abs", "i8"),
449+
]
450+
)
451+
452+
data = np.empty(rtree_leaf_nodes.shape, dtype=dtype)
453+
data["start_chrom_ix"] = rtree_leaf_nodes["start_chrom_ix"]
454+
data["start_base"] = rtree_leaf_nodes["start_base"]
455+
data["end_chrom_ix"] = rtree_leaf_nodes["end_chrom_ix"]
456+
data["end_base"] = rtree_leaf_nodes["end_base"]
457+
data["data_offset"] = rtree_leaf_nodes["data_offset"]
458+
data["data_size"] = rtree_leaf_nodes["data_size"]
459+
460+
# make_positions_global inline
461+
data["start_abs"] = (
462+
rtree_leaf_nodes["start_base"]
463+
+ chromosome_offsets[rtree_leaf_nodes["start_chrom_ix"]]
464+
).astype(np.int64)
465+
data["end_abs"] = (
466+
rtree_leaf_nodes["end_base"]
467+
+ chromosome_offsets[rtree_leaf_nodes["end_chrom_ix"]]
468+
).astype(np.int64)
469+
470+
sort_indices = np.argsort(data["start_abs"])
471+
sorted_data = data[sort_indices]
472+
473+
return sorted_data

bigwig_loader/collection.py

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
import logging
2+
import os
3+
from concurrent.futures import ProcessPoolExecutor
4+
from concurrent.futures import as_completed
25
from functools import cached_property
36
from pathlib import Path
47
from typing import Any
@@ -14,6 +17,7 @@
1417

1518
from bigwig_loader.batch_processor import BatchProcessor
1619
from bigwig_loader.bigwig import BigWig
20+
from bigwig_loader.bigwig import prepare_index_for_bigwig
1721
from bigwig_loader.decompressor import Decoder
1822
from bigwig_loader.memory_bank import MemoryBank
1923
from bigwig_loader.merge_intervals import merge_interval_dataframe
@@ -22,6 +26,8 @@
2226
from bigwig_loader.subtract_intervals import subtract_interval_dataframe
2327
from bigwig_loader.util import chromosome_sort
2428

29+
logger = logging.getLogger(__name__)
30+
2531

2632
class BigWigCollection:
2733
"""
@@ -47,7 +53,7 @@ def __init__(
4753
bigwig_path: Union[str, Sequence[str], Path, Sequence[Path]],
4854
file_extensions: Sequence[str] = (".bigWig", ".bw"),
4955
crawl: bool = True,
50-
scale: Optional[dict[Union[str | Path], Any]] = None,
56+
scale: Optional[dict[Union[str, Path], Any]] = None,
5157
first_n_files: Optional[int] = None,
5258
pinned_memory_size: int = 10000,
5359
):
@@ -81,11 +87,49 @@ def __init__(
8187
self.run_indexing()
8288
self._batch_processor = None
8389

84-
def run_indexing(self) -> None:
85-
for bigwig in self.bigwigs:
86-
bigwig.run_indexing(
87-
chromosome_offsets=self.local_chrom_ids_to_offset_matrix[bigwig.id]
88-
)
90+
def run_indexing(self, max_workers: Optional[int] = None) -> None:
91+
"""
92+
Run indexing for all bigwig files in parallel using multiple CPUs.
93+
"""
94+
n_bigwigs = len(self.bigwigs)
95+
if n_bigwigs == 0:
96+
return
97+
98+
if n_bigwigs <= 4 or max_workers == 1:
99+
for bigwig in self.bigwigs:
100+
bigwig.run_indexing(
101+
chromosome_offsets=self.local_chrom_ids_to_offset_matrix[bigwig.id]
102+
)
103+
return
104+
105+
if max_workers is None:
106+
max_workers = min(os.cpu_count() or 4, n_bigwigs)
107+
108+
verbose = True # logger.isEnabledFor(logging.INFO)
109+
110+
with ProcessPoolExecutor(max_workers=max_workers) as executor:
111+
futures = {
112+
executor.submit(
113+
prepare_index_for_bigwig,
114+
self.local_chrom_ids_to_offset_matrix[bigwig.id],
115+
bigwig.rtree_leaf_nodes,
116+
): bigwig
117+
for bigwig in self.bigwigs
118+
}
119+
120+
for i, future in enumerate(as_completed(futures), 1):
121+
bigwig = futures[future]
122+
sorted_data = future.result()
123+
bigwig.chromosome_offsets = self.local_chrom_ids_to_offset_matrix[
124+
bigwig.id
125+
]
126+
bigwig.finalize_indexing(sorted_data)
127+
128+
if verbose and (i % 100 == 0 or i == n_bigwigs):
129+
print(f"\rIndexed {i}/{n_bigwigs}", end="", flush=True)
130+
131+
if verbose:
132+
print()
89133

90134
def __len__(self) -> int:
91135
return len(self.bigwigs)
@@ -217,13 +261,11 @@ def intervals(
217261
Returns: pandas dataframe of intervals (chrom, start, end, value)
218262
219263
"""
220-
logging.info("Collecting intervals from BigWig files.")
264+
logger.info("Collecting intervals from BigWig files.")
221265
interval_for_all_bigwigs = []
222266
n_bigwigs = len(self.bigwigs)
223267
for i, bw in enumerate(self.bigwigs):
224-
logging.info(
225-
f"Getting intervals for BigWig file {i}/{n_bigwigs}: {bw.path}"
226-
)
268+
logger.info(f"Getting intervals for BigWig file {i}/{n_bigwigs}: {bw.path}")
227269
interval_for_all_bigwigs.append(
228270
bw.intervals(
229271
include_chromosomes=include_chromosomes,
@@ -244,14 +286,14 @@ def intervals(
244286
intervals["start"] = intervals["start"].values.clip(padding) - padding # type: ignore
245287
intervals["end"] = intervals["end"].values + padding # type: ignore
246288
if merge:
247-
logging.info(
289+
logger.info(
248290
"Merging intervals from different BigWig Files into one interval list."
249291
)
250292
intervals = merge_interval_dataframe(
251293
intervals, is_sorted=False, allow_gap=merge_allow_gap
252294
)
253295
if blacklist is not None:
254-
logging.info("Subtracting blacklisted regions.")
296+
logger.info("Subtracting blacklisted regions.")
255297
intervals = subtract_interval_dataframe(
256298
intervals=intervals, blacklist=blacklist, buffer=blacklist_buffer
257299
)
@@ -375,7 +417,7 @@ def create_global_position_system(
375417
# exceeding this would be a problem as all the positions
376418
# in bigwig files are encoded in 32 bits. Making a global
377419
# base position that exceeds this number would be a problem.
378-
logging.warning(
420+
logger.warning(
379421
"The sum of sizes of the chromosomes exceeds the size of uint32"
380422
)
381423
for bigwig in self.bigwigs:

0 commit comments

Comments
 (0)