Skip to content

Commit fb35fee

Browse files
Merge pull request #30 from jeromekelleher/fixup-files
Fixup files
2 parents 30b60ca + 496cfed commit fb35fee

File tree

3 files changed

+123
-2
lines changed

3 files changed

+123
-2
lines changed

bio2zarr/plink.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import numpy as np
2+
import zarr
3+
import bed_reader
4+
5+
from . import core
6+
7+
8+
def encode_bed_partition_genotypes(
9+
bed_path, zarr_path, start_variant, end_variant, encoder_threads=8
10+
):
11+
bed = bed_reader.open_bed(bed_path, num_threads=1)
12+
13+
store = zarr.DirectoryStore(zarr_path)
14+
root = zarr.group(store=store)
15+
gt = core.BufferedArray(root["call_genotype"])
16+
gt_mask = core.BufferedArray(root["call_genotype_mask"])
17+
gt_phased = core.BufferedArray(root["call_genotype_phased"])
18+
chunk_length = gt.array.chunks[0]
19+
assert start_variant % chunk_length == 0
20+
21+
buffered_arrays = [gt, gt_phased, gt_mask]
22+
23+
with core.ThreadedZarrEncoder(buffered_arrays, encoder_threads) as te:
24+
start = start_variant
25+
while start < end_variant:
26+
stop = min(start + chunk_length, end_variant)
27+
bed_chunk = bed.read(index=slice(start, stop), dtype="int8").T
28+
# Note could do this without iterating over rows, but it's a bit
29+
# simpler and the bottleneck is in the encoding step anyway. It's
30+
# also nice to have updates on the progress monitor.
31+
for values in bed_chunk:
32+
j = te.next_buffer_row()
33+
dest = gt.buff[j]
34+
dest[values == -127] = -1
35+
dest[values == 2] = 1
36+
dest[values == 1, 0] = 1
37+
gt_phased.buff[j] = False
38+
gt_mask.buff[j] = dest == -1
39+
core.update_progress(1)
40+
start = stop
41+
42+
43+
def convert(
44+
bed_path,
45+
zarr_path,
46+
*,
47+
show_progress=False,
48+
worker_processes=1,
49+
chunk_length=None,
50+
chunk_width=None,
51+
):
52+
bed = bed_reader.open_bed(bed_path, num_threads=1)
53+
n = bed.iid_count
54+
m = bed.sid_count
55+
del bed
56+
57+
# FIXME
58+
if chunk_width is None:
59+
chunk_width = 1000
60+
if chunk_length is None:
61+
chunk_length = 10_000
62+
63+
store = zarr.DirectoryStore(zarr_path)
64+
root = zarr.group(store=store, overwrite=True)
65+
66+
ploidy = 2
67+
shape = [m, n]
68+
chunks = [chunk_length, chunk_width]
69+
dimensions = ["variants", "samples"]
70+
71+
a = root.empty(
72+
"call_genotype_phased",
73+
dtype="bool",
74+
shape=list(shape),
75+
chunks=list(chunks),
76+
compressor=core.default_compressor,
77+
)
78+
a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions)
79+
80+
shape += [ploidy]
81+
dimensions += ["ploidy"]
82+
a = root.empty(
83+
"call_genotype",
84+
dtype="i8",
85+
shape=list(shape),
86+
chunks=list(chunks),
87+
compressor=core.default_compressor,
88+
)
89+
a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions)
90+
91+
a = root.empty(
92+
"call_genotype_mask",
93+
dtype="bool",
94+
shape=list(shape),
95+
chunks=list(chunks),
96+
compressor=core.default_compressor,
97+
)
98+
a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions)
99+
100+
chunks_per_future = 2 # FIXME - make a parameter
101+
start = 0
102+
partitions = []
103+
while start < m:
104+
stop = min(m, start + chunk_length * chunks_per_future)
105+
partitions.append((start, stop))
106+
start = stop
107+
assert start == m
108+
109+
progress_config = core.ProgressConfig(
110+
total=m, title="Convert", units="vars", show=show_progress
111+
)
112+
with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
113+
for start, end in partitions:
114+
pwm.submit(encode_bed_partition_genotypes, bed_path, zarr_path, start, end)
115+
116+
# TODO also add atomic swap like VCF. Should be abstracted to
117+
# share basic code for setting up the variation dataset zarr
118+
zarr.consolidate_metadata(zarr_path)

bio2zarr/vcf_partition.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
from typing import Any, Dict, Optional, Sequence, Union
22

3-
import dask
43
import fsspec
54
import numpy as np
65
from cyvcf2 import VCF
6+
import humanfriendly
77

88
from bio2zarr.csi import CSI_EXTENSION, read_csi
99
from bio2zarr.tbi import TABIX_EXTENSION, read_tabix
@@ -128,7 +128,10 @@ def partition_into_regions(
128128
raise ValueError("num_parts must be positive")
129129

130130
if target_part_size is not None:
131-
target_part_size_bytes: int = dask.utils.parse_bytes(target_part_size)
131+
if isinstance(target_part_size, int):
132+
target_part_size_bytes = target_part_size
133+
else:
134+
target_part_size_bytes = humanfriendly.parse_size(target_part_size)
132135
if target_part_size_bytes < 1:
133136
raise ValueError("target_part_size must be positive")
134137

File renamed without changes.

0 commit comments

Comments
 (0)