Skip to content

Commit 9193834

Browse files
Merge pull request #22 from tomwhite/vcf-partition
Port VCF partitioning code from sgkit
2 parents 75e8039 + fb451be commit 9193834

19 files changed

+840
-0
lines changed

bio2zarr/csi.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
"""Functions for parsing CSI files into Python objects so they can be inspected.
2+
3+
The implementation follows the [CSI index file format](http://samtools.github.io/hts-specs/CSIv1.pdf).
4+
5+
"""
6+
from dataclasses import dataclass
7+
from typing import Any, Dict, Optional, Sequence
8+
9+
import numpy as np
10+
11+
from bio2zarr.typing import PathType
12+
from bio2zarr.utils import (
13+
get_file_offset,
14+
open_gzip,
15+
read_bytes_as_tuple,
16+
read_bytes_as_value,
17+
)
18+
19+
CSI_EXTENSION = ".csi"
20+
21+
22+
@dataclass
23+
class Chunk:
24+
cnk_beg: int
25+
cnk_end: int
26+
27+
28+
@dataclass
29+
class Bin:
30+
bin: int
31+
loffset: int
32+
chunks: Sequence[Chunk]
33+
34+
35+
@dataclass
36+
class CSIIndex:
37+
min_shift: int
38+
depth: int
39+
aux: str
40+
bins: Sequence[Sequence[Bin]]
41+
record_counts: Sequence[int]
42+
n_no_coor: int
43+
44+
def offsets(self) -> Any:
45+
pseudo_bin = bin_limit(self.min_shift, self.depth) + 1
46+
47+
file_offsets = []
48+
contig_indexes = []
49+
positions = []
50+
for contig_index, bins in enumerate(self.bins):
51+
# bins may be in any order within a contig, so sort by loffset
52+
for bin in sorted(bins, key=lambda b: b.loffset):
53+
if bin.bin == pseudo_bin:
54+
continue # skip pseudo bins
55+
file_offset = get_file_offset(bin.loffset)
56+
position = get_first_locus_in_bin(self, bin.bin)
57+
file_offsets.append(file_offset)
58+
contig_indexes.append(contig_index)
59+
positions.append(position)
60+
61+
return np.array(file_offsets), np.array(contig_indexes), np.array(positions)
62+
63+
64+
def bin_limit(min_shift: int, depth: int) -> int:
65+
"""Defined in CSI spec"""
66+
return ((1 << (depth + 1) * 3) - 1) // 7
67+
68+
69+
def get_first_bin_in_level(level: int) -> int:
70+
return ((1 << level * 3) - 1) // 7
71+
72+
73+
def get_level_size(level: int) -> int:
74+
return 1 << level * 3
75+
76+
77+
def get_level_for_bin(csi: CSIIndex, bin: int) -> int:
78+
for i in range(csi.depth, -1, -1):
79+
if bin >= get_first_bin_in_level(i):
80+
return i
81+
raise ValueError(f"Cannot find level for bin {bin}.") # pragma: no cover
82+
83+
84+
def get_first_locus_in_bin(csi: CSIIndex, bin: int) -> int:
85+
level = get_level_for_bin(csi, bin)
86+
first_bin_on_level = get_first_bin_in_level(level)
87+
level_size = get_level_size(level)
88+
max_span = 1 << (csi.min_shift + 3 * csi.depth)
89+
return (bin - first_bin_on_level) * (max_span // level_size) + 1
90+
91+
92+
def read_csi(
93+
file: PathType, storage_options: Optional[Dict[str, str]] = None
94+
) -> CSIIndex:
95+
"""Parse a CSI file into a `CSIIndex` object.
96+
97+
Parameters
98+
----------
99+
file : PathType
100+
The path to the CSI file.
101+
102+
Returns
103+
-------
104+
CSIIndex
105+
An object representing a CSI index.
106+
107+
Raises
108+
------
109+
ValueError
110+
If the file is not a CSI file.
111+
"""
112+
with open_gzip(file, storage_options=storage_options) as f:
113+
magic = read_bytes_as_value(f, "4s")
114+
if magic != b"CSI\x01":
115+
raise ValueError("File not in CSI format.")
116+
117+
min_shift, depth, l_aux = read_bytes_as_tuple(f, "<3i")
118+
aux = read_bytes_as_value(f, f"{l_aux}s", "")
119+
n_ref = read_bytes_as_value(f, "<i")
120+
121+
pseudo_bin = bin_limit(min_shift, depth) + 1
122+
123+
bins = []
124+
record_counts = []
125+
126+
if n_ref > 0:
127+
for _ in range(n_ref):
128+
n_bin = read_bytes_as_value(f, "<i")
129+
seq_bins = []
130+
record_count = -1
131+
for _ in range(n_bin):
132+
bin, loffset, n_chunk = read_bytes_as_tuple(f, "<IQi")
133+
chunks = []
134+
for _ in range(n_chunk):
135+
chunk = Chunk(*read_bytes_as_tuple(f, "<QQ"))
136+
chunks.append(chunk)
137+
seq_bins.append(Bin(bin, loffset, chunks))
138+
139+
if bin == pseudo_bin:
140+
assert len(chunks) == 2
141+
n_mapped, n_unmapped = chunks[1].cnk_beg, chunks[1].cnk_end
142+
record_count = n_mapped + n_unmapped
143+
bins.append(seq_bins)
144+
record_counts.append(record_count)
145+
146+
n_no_coor = read_bytes_as_value(f, "<Q", 0)
147+
148+
assert len(f.read(1)) == 0
149+
150+
return CSIIndex(min_shift, depth, aux, bins, record_counts, n_no_coor)

bio2zarr/tbi.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
"""Functions for parsing tabix files into Python objects so they can be inspected.
2+
3+
The implementation follows the [Tabix index file format](https://samtools.github.io/hts-specs/tabix.pdf).
4+
5+
"""
6+
from dataclasses import dataclass
7+
from typing import Any, Dict, Optional, Sequence
8+
9+
import numpy as np
10+
11+
from bio2zarr.typing import PathType
12+
from bio2zarr.utils import (
13+
get_file_offset,
14+
open_gzip,
15+
read_bytes_as_tuple,
16+
read_bytes_as_value,
17+
)
18+
19+
TABIX_EXTENSION = ".tbi"
20+
TABIX_LINEAR_INDEX_INTERVAL_SIZE = 1 << 14 # 16kb interval size
21+
22+
23+
@dataclass
24+
class Header:
25+
n_ref: int
26+
format: int
27+
col_seq: int
28+
col_beg: int
29+
col_end: int
30+
meta: int
31+
skip: int
32+
l_nm: int
33+
34+
35+
@dataclass
36+
class Chunk:
37+
cnk_beg: int
38+
cnk_end: int
39+
40+
41+
@dataclass
42+
class Bin:
43+
bin: int
44+
chunks: Sequence[Chunk]
45+
46+
47+
@dataclass
48+
class TabixIndex:
49+
header: Header
50+
sequence_names: Sequence[str]
51+
bins: Sequence[Sequence[Bin]]
52+
linear_indexes: Sequence[Sequence[int]]
53+
record_counts: Sequence[int]
54+
n_no_coor: int
55+
56+
def offsets(self) -> Any:
57+
# Combine the linear indexes into one stacked array
58+
linear_indexes = self.linear_indexes
59+
linear_index = np.hstack([np.array(li) for li in linear_indexes])
60+
61+
# Create file offsets for each element in the linear index
62+
file_offsets = np.array([get_file_offset(vfp) for vfp in linear_index])
63+
64+
# Calculate corresponding contigs and positions or each element in the linear index
65+
contig_indexes = np.hstack(
66+
[np.full(len(li), i) for (i, li) in enumerate(linear_indexes)]
67+
)
68+
# positions are 1-based and inclusive
69+
positions = np.hstack(
70+
[
71+
np.arange(len(li)) * TABIX_LINEAR_INDEX_INTERVAL_SIZE + 1
72+
for li in linear_indexes
73+
]
74+
)
75+
assert len(file_offsets) == len(contig_indexes)
76+
assert len(file_offsets) == len(positions)
77+
78+
return file_offsets, contig_indexes, positions
79+
80+
81+
def read_tabix(
82+
file: PathType, storage_options: Optional[Dict[str, str]] = None
83+
) -> TabixIndex:
84+
"""Parse a tabix file into a `TabixIndex` object.
85+
86+
Parameters
87+
----------
88+
file : PathType
89+
The path to the tabix file.
90+
91+
Returns
92+
-------
93+
TabixIndex
94+
An object representing a tabix index.
95+
96+
Raises
97+
------
98+
ValueError
99+
If the file is not a tabix file.
100+
"""
101+
with open_gzip(file, storage_options=storage_options) as f:
102+
magic = read_bytes_as_value(f, "4s")
103+
if magic != b"TBI\x01":
104+
raise ValueError("File not in Tabix format.")
105+
106+
header = Header(*read_bytes_as_tuple(f, "<8i"))
107+
108+
sequence_names = []
109+
bins = []
110+
linear_indexes = []
111+
record_counts = []
112+
113+
if header.l_nm > 0:
114+
names = read_bytes_as_value(f, f"<{header.l_nm}s")
115+
# Convert \0-terminated names to strings
116+
sequence_names = [str(name, "utf-8") for name in names.split(b"\x00")[:-1]]
117+
118+
for _ in range(header.n_ref):
119+
n_bin = read_bytes_as_value(f, "<i")
120+
seq_bins = []
121+
record_count = -1
122+
for _ in range(n_bin):
123+
bin, n_chunk = read_bytes_as_tuple(f, "<Ii")
124+
chunks = []
125+
for _ in range(n_chunk):
126+
chunk = Chunk(*read_bytes_as_tuple(f, "<QQ"))
127+
chunks.append(chunk)
128+
seq_bins.append(Bin(bin, chunks))
129+
130+
if bin == 37450: # pseudo-bin, see section 5.2 of BAM spec
131+
assert len(chunks) == 2
132+
n_mapped, n_unmapped = chunks[1].cnk_beg, chunks[1].cnk_end
133+
record_count = n_mapped + n_unmapped
134+
n_intv = read_bytes_as_value(f, "<i")
135+
linear_index = []
136+
for _ in range(n_intv):
137+
ioff = read_bytes_as_value(f, "<Q")
138+
linear_index.append(ioff)
139+
bins.append(seq_bins)
140+
linear_indexes.append(linear_index)
141+
record_counts.append(record_count)
142+
143+
n_no_coor = read_bytes_as_value(f, "<Q", 0)
144+
145+
assert len(f.read(1)) == 0
146+
147+
return TabixIndex(
148+
header, sequence_names, bins, linear_indexes, record_counts, n_no_coor
149+
)

bio2zarr/typing.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from pathlib import Path
2+
from typing import Union
3+
4+
PathType = Union[str, Path]

bio2zarr/utils.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import struct
2+
from typing import IO, Any, Dict, Optional, Sequence
3+
4+
import fsspec
5+
6+
from bio2zarr.typing import PathType
7+
8+
9+
def ceildiv(a: int, b: int) -> int:
10+
"""Safe integer ceil function"""
11+
return -(-a // b)
12+
13+
14+
def get_file_length(
15+
path: PathType, storage_options: Optional[Dict[str, str]] = None
16+
) -> int:
17+
"""Get the length of a file in bytes."""
18+
url = str(path)
19+
storage_options = storage_options or {}
20+
with fsspec.open(url, **storage_options) as openfile:
21+
fs = openfile.fs
22+
size = fs.size(url)
23+
if size is None:
24+
raise IOError(f"Cannot determine size of file {url}") # pragma: no cover
25+
return int(size)
26+
27+
28+
def get_file_offset(vfp: int) -> int:
29+
"""Convert a block compressed virtual file pointer to a file offset."""
30+
address_mask = 0xFFFFFFFFFFFF
31+
return vfp >> 16 & address_mask
32+
33+
34+
def read_bytes_as_value(f: IO[Any], fmt: str, nodata: Optional[Any] = None) -> Any:
35+
"""Read bytes using a `struct` format string and return the unpacked data value.
36+
37+
Parameters
38+
----------
39+
f : IO[Any]
40+
The IO stream to read bytes from.
41+
fmt : str
42+
A Python `struct` format string.
43+
nodata : Optional[Any], optional
44+
The value to return in case there is no further data in the stream, by default None
45+
46+
Returns
47+
-------
48+
Any
49+
The unpacked data value read from the stream.
50+
"""
51+
data = f.read(struct.calcsize(fmt))
52+
if not data:
53+
return nodata
54+
values = struct.Struct(fmt).unpack(data)
55+
assert len(values) == 1
56+
return values[0]
57+
58+
59+
def read_bytes_as_tuple(f: IO[Any], fmt: str) -> Sequence[Any]:
60+
"""Read bytes using a `struct` format string and return the unpacked data values.
61+
62+
Parameters
63+
----------
64+
f : IO[Any]
65+
The IO stream to read bytes from.
66+
fmt : str
67+
A Python `struct` format string.
68+
69+
Returns
70+
-------
71+
Sequence[Any]
72+
The unpacked data values read from the stream.
73+
"""
74+
data = f.read(struct.calcsize(fmt))
75+
return struct.Struct(fmt).unpack(data)
76+
77+
78+
def open_gzip(path: PathType, storage_options: Optional[Dict[str, str]]) -> IO[Any]:
79+
url = str(path)
80+
storage_options = storage_options or {}
81+
openfile: IO[Any] = fsspec.open(url, compression="gzip", **storage_options)
82+
return openfile

0 commit comments

Comments
 (0)