Skip to content

Commit e66839f

Browse files
committed
Add VCF partition code from sgkit
1 parent 39c53f2 commit e66839f

File tree

2 files changed

+318
-0
lines changed

2 files changed

+318
-0
lines changed

bio2zarr/vcf_partition.py

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
from typing import Any, Dict, Optional, Sequence, Union
2+
3+
import dask
4+
import fsspec
5+
import numpy as np
6+
from cyvcf2 import VCF
7+
8+
from sgkit.io.vcf.csi import CSI_EXTENSION, read_csi
9+
from sgkit.io.vcf.tbi import TABIX_EXTENSION, read_tabix
10+
from sgkit.io.vcf.utils import ceildiv, get_file_length
11+
from sgkit.typing import PathType
12+
13+
14+
def region_string(contig: str, start: int, end: Optional[int] = None) -> str:
15+
if end is not None:
16+
return f"{contig}:{start}-{end}"
17+
else:
18+
return f"{contig}:{start}-"
19+
20+
21+
def get_tabix_path(
22+
vcf_path: PathType, storage_options: Optional[Dict[str, str]] = None
23+
) -> Optional[str]:
24+
url = str(vcf_path)
25+
storage_options = storage_options or {}
26+
tbi_path = url + TABIX_EXTENSION
27+
with fsspec.open(url, **storage_options) as openfile:
28+
fs = openfile.fs
29+
if fs.exists(tbi_path):
30+
return tbi_path
31+
else:
32+
return None
33+
34+
35+
def get_csi_path(
36+
vcf_path: PathType, storage_options: Optional[Dict[str, str]] = None
37+
) -> Optional[str]:
38+
url = str(vcf_path)
39+
storage_options = storage_options or {}
40+
csi_path = url + CSI_EXTENSION
41+
with fsspec.open(url, **storage_options) as openfile:
42+
fs = openfile.fs
43+
if fs.exists(csi_path):
44+
return csi_path
45+
else:
46+
return None
47+
48+
49+
def read_index(
50+
index_path: PathType, storage_options: Optional[Dict[str, str]] = None
51+
) -> Any:
52+
url = str(index_path)
53+
if url.endswith(TABIX_EXTENSION):
54+
return read_tabix(url, storage_options=storage_options)
55+
elif url.endswith(CSI_EXTENSION):
56+
return read_csi(url, storage_options=storage_options)
57+
else:
58+
raise ValueError("Only .tbi or .csi indexes are supported.")
59+
60+
61+
def get_sequence_names(vcf_path: PathType, index: Any) -> Any:
62+
try:
63+
# tbi stores sequence names
64+
return index.sequence_names
65+
except AttributeError:
66+
# ... but csi doesn't, so fall back to the VCF header
67+
return VCF(vcf_path).seqnames
68+
69+
70+
def partition_into_regions(
71+
vcf_path: PathType,
72+
*,
73+
index_path: Optional[PathType] = None,
74+
num_parts: Optional[int] = None,
75+
target_part_size: Union[None, int, str] = None,
76+
storage_options: Optional[Dict[str, str]] = None,
77+
) -> Optional[Sequence[str]]:
78+
"""
79+
Calculate genomic region strings to partition a compressed VCF or BCF file into roughly equal parts.
80+
81+
A ``.tbi`` or ``.csi`` file is used to find BGZF boundaries in the compressed VCF file, which are then
82+
used to divide the file into parts.
83+
84+
The number of parts can specified directly by providing ``num_parts``, or by specifying the
85+
desired size (in bytes) of each (compressed) part by providing ``target_part_size``.
86+
Exactly one of ``num_parts`` or ``target_part_size`` must be provided.
87+
88+
Both ``num_parts`` and ``target_part_size`` serve as hints: the number of parts and their sizes
89+
may be more or less than these parameters.
90+
91+
Parameters
92+
----------
93+
vcf_path
94+
The path to the VCF file.
95+
index_path
96+
The path to the VCF index (``.tbi`` or ``.csi``), by default None. If not specified, the
97+
index path is constructed by appending the index suffix (``.tbi`` or ``.csi``) to the VCF path.
98+
num_parts
99+
The desired number of parts to partition the VCF file into, by default None
100+
target_part_size
101+
The desired size, in bytes, of each (compressed) part of the partitioned VCF, by default None.
102+
If the value is a string, it may be specified using standard abbreviations, e.g. ``100MB`` is
103+
equivalent to ``100_000_000``.
104+
storage_options:
105+
Any additional parameters for the storage backend (see ``fsspec.open``).
106+
107+
Returns
108+
-------
109+
The region strings that partition the VCF file, or None if the VCF file should not be partitioned
110+
(so there is only a single partition).
111+
112+
Raises
113+
------
114+
ValueError
115+
If neither of ``num_parts`` or ``target_part_size`` has been specified.
116+
ValueError
117+
If both of ``num_parts`` and ``target_part_size`` have been specified.
118+
ValueError
119+
If either of ``num_parts`` or ``target_part_size`` is not a positive integer.
120+
"""
121+
if num_parts is None and target_part_size is None:
122+
raise ValueError("One of num_parts or target_part_size must be specified")
123+
124+
if num_parts is not None and target_part_size is not None:
125+
raise ValueError("Only one of num_parts or target_part_size may be specified")
126+
127+
if num_parts is not None and num_parts < 1:
128+
raise ValueError("num_parts must be positive")
129+
130+
if target_part_size is not None:
131+
target_part_size_bytes: int = dask.utils.parse_bytes(target_part_size)
132+
if target_part_size_bytes < 1:
133+
raise ValueError("target_part_size must be positive")
134+
135+
# Calculate the desired part file boundaries
136+
file_length = get_file_length(vcf_path, storage_options=storage_options)
137+
if num_parts is not None:
138+
target_part_size_bytes = file_length // num_parts
139+
elif target_part_size_bytes is not None:
140+
num_parts = ceildiv(file_length, target_part_size_bytes)
141+
if num_parts == 1:
142+
return None
143+
part_lengths = np.array([i * target_part_size_bytes for i in range(num_parts)])
144+
145+
if index_path is None:
146+
index_path = get_tabix_path(vcf_path, storage_options=storage_options)
147+
if index_path is None:
148+
index_path = get_csi_path(vcf_path, storage_options=storage_options)
149+
if index_path is None:
150+
raise ValueError("Cannot find .tbi or .csi file.")
151+
152+
# Get the file offsets from .tbi/.csi
153+
index = read_index(index_path, storage_options=storage_options)
154+
sequence_names = get_sequence_names(vcf_path, index)
155+
file_offsets, region_contig_indexes, region_positions = index.offsets()
156+
157+
# Search the file offsets to find which indexes the part lengths fall at
158+
ind = np.searchsorted(file_offsets, part_lengths)
159+
160+
# Drop any parts that are greater than the file offsets (these will be covered by a region with no end)
161+
ind = np.delete(ind, ind >= len(file_offsets)) # type: ignore[no-untyped-call]
162+
163+
# Drop any duplicates
164+
ind = np.unique(ind) # type: ignore[no-untyped-call]
165+
166+
# Calculate region contig and start for each index
167+
region_contigs = region_contig_indexes[ind]
168+
region_starts = region_positions[ind]
169+
170+
# Build region query strings
171+
regions = []
172+
for i in range(len(region_starts)):
173+
contig = sequence_names[region_contigs[i]]
174+
start = region_starts[i]
175+
176+
if i == len(region_starts) - 1: # final region
177+
regions.append(region_string(contig, start))
178+
else:
179+
next_contig = sequence_names[region_contigs[i + 1]]
180+
next_start = region_starts[i + 1]
181+
end = next_start - 1 # subtract one since positions are inclusive
182+
if next_contig == contig: # contig doesn't change
183+
regions.append(region_string(contig, start, end))
184+
else: # contig changes, so need two regions (or possibly more if any sequences were skipped)
185+
regions.append(region_string(contig, start))
186+
for ri in range(region_contigs[i] + 1, region_contigs[i + 1]):
187+
regions.append(sequence_names[ri]) # pragma: no cover
188+
regions.append(region_string(next_contig, 1, end))
189+
# Add any sequences at the end that were not skipped
190+
for ri in range(region_contigs[-1] + 1, len(sequence_names)):
191+
regions.append(sequence_names[ri]) # pragma: no cover
192+
193+
return regions

tests/test_vcf_partition.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import pytest
2+
3+
from sgkit.io.vcf import partition_into_regions
4+
from sgkit.io.vcf.vcf_reader import count_variants
5+
6+
from .utils import path_for_test
7+
8+
9+
@pytest.mark.parametrize(
10+
"vcf_file",
11+
[
12+
"CEUTrio.20.21.gatk3.4.g.bcf",
13+
"CEUTrio.20.21.gatk3.4.g.vcf.bgz",
14+
"NA12878.prod.chr20snippet.g.vcf.gz",
15+
],
16+
)
17+
@pytest.mark.parametrize(
18+
"is_path",
19+
[True, False],
20+
)
21+
def test_partition_into_regions__num_parts(shared_datadir, vcf_file, is_path):
22+
vcf_path = path_for_test(shared_datadir, vcf_file, is_path)
23+
24+
regions = partition_into_regions(vcf_path, num_parts=4)
25+
26+
assert regions is not None
27+
part_variant_counts = [count_variants(vcf_path, region) for region in regions]
28+
total_variants = count_variants(vcf_path)
29+
30+
assert sum(part_variant_counts) == total_variants
31+
32+
33+
@pytest.mark.parametrize(
34+
"is_path",
35+
[True, False],
36+
)
37+
def test_partition_into_regions__num_parts_large(shared_datadir, is_path):
38+
vcf_path = path_for_test(shared_datadir, "CEUTrio.20.21.gatk3.4.g.vcf.bgz", is_path)
39+
40+
regions = partition_into_regions(vcf_path, num_parts=100)
41+
assert regions is not None
42+
assert len(regions) == 18
43+
44+
part_variant_counts = [count_variants(vcf_path, region) for region in regions]
45+
total_variants = count_variants(vcf_path)
46+
47+
assert sum(part_variant_counts) == total_variants
48+
49+
50+
@pytest.mark.parametrize(
51+
"target_part_size",
52+
[
53+
100_000,
54+
"100KB",
55+
"100 kB",
56+
],
57+
)
58+
@pytest.mark.parametrize(
59+
"is_path",
60+
[True, False],
61+
)
62+
def test_partition_into_regions__target_part_size(
63+
shared_datadir, is_path, target_part_size
64+
):
65+
vcf_path = path_for_test(shared_datadir, "CEUTrio.20.21.gatk3.4.g.vcf.bgz", is_path)
66+
67+
regions = partition_into_regions(vcf_path, target_part_size=target_part_size)
68+
assert regions is not None
69+
assert len(regions) == 5
70+
71+
part_variant_counts = [count_variants(vcf_path, region) for region in regions]
72+
total_variants = count_variants(vcf_path)
73+
74+
assert sum(part_variant_counts) == total_variants
75+
76+
77+
@pytest.mark.parametrize(
78+
"is_path",
79+
[True, False],
80+
)
81+
def test_partition_into_regions__invalid_arguments(shared_datadir, is_path):
82+
vcf_path = path_for_test(shared_datadir, "CEUTrio.20.21.gatk3.4.g.vcf.bgz", is_path)
83+
84+
with pytest.raises(
85+
ValueError, match=r"One of num_parts or target_part_size must be specified"
86+
):
87+
partition_into_regions(vcf_path)
88+
89+
with pytest.raises(
90+
ValueError, match=r"Only one of num_parts or target_part_size may be specified"
91+
):
92+
partition_into_regions(vcf_path, num_parts=4, target_part_size=100_000)
93+
94+
with pytest.raises(ValueError, match=r"num_parts must be positive"):
95+
partition_into_regions(vcf_path, num_parts=0)
96+
97+
with pytest.raises(ValueError, match=r"target_part_size must be positive"):
98+
partition_into_regions(vcf_path, target_part_size=0)
99+
100+
101+
@pytest.mark.parametrize(
102+
"is_path",
103+
[True, False],
104+
)
105+
def test_partition_into_regions__one_part(shared_datadir, is_path):
106+
vcf_path = path_for_test(shared_datadir, "CEUTrio.20.21.gatk3.4.g.vcf.bgz", is_path)
107+
assert partition_into_regions(vcf_path, num_parts=1) is None
108+
109+
110+
@pytest.mark.parametrize(
111+
"is_path",
112+
[True, False],
113+
)
114+
def test_partition_into_regions__missing_index(shared_datadir, is_path):
115+
vcf_path = path_for_test(
116+
shared_datadir, "CEUTrio.20.21.gatk3.4.noindex.g.vcf.bgz", is_path
117+
)
118+
with pytest.raises(ValueError, match=r"Cannot find .tbi or .csi file."):
119+
partition_into_regions(vcf_path, num_parts=2)
120+
121+
bogus_index_path = path_for_test(
122+
shared_datadir, "CEUTrio.20.21.gatk3.4.noindex.g.vcf.bgz.index", is_path
123+
)
124+
with pytest.raises(ValueError, match=r"Only .tbi or .csi indexes are supported."):
125+
partition_into_regions(vcf_path, index_path=bogus_index_path, num_parts=2)

0 commit comments

Comments
 (0)