Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions bio2zarr/plink.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import numpy as np
import zarr

from bio2zarr.zarr_utils import ZARR_FORMAT_KWARGS

from . import core

logger = logging.getLogger(__name__)
Expand All @@ -17,8 +19,7 @@ def encode_genotypes_slice(bed_path, zarr_path, start, stop):
# the correct approach is, but it is important to note that the
# 0th allele is *not* necessarily the REF for these datasets.
bed = bed_reader.open_bed(bed_path, num_threads=1, count_A1=False)
store = zarr.DirectoryStore(zarr_path)
root = zarr.group(store=store)
root = zarr.open(store=zarr_path, mode="a", **ZARR_FORMAT_KWARGS)
gt = core.BufferedArray(root["call_genotype"], start)
gt_mask = core.BufferedArray(root["call_genotype_mask"], start)
gt_phased = core.BufferedArray(root["call_genotype_phased"], start)
Expand Down Expand Up @@ -73,8 +74,7 @@ def convert(
if variants_chunk_size is None:
variants_chunk_size = 10_000

store = zarr.DirectoryStore(zarr_path)
root = zarr.group(store=store, overwrite=True)
root = zarr.open_group(store=zarr_path, mode="w", **ZARR_FORMAT_KWARGS)

ploidy = 2
shape = [m, n]
Expand All @@ -88,7 +88,8 @@ def convert(

a = root.array(
"sample_id",
bed.iid,
data=bed.iid,
shape=bed.iid.shape,
dtype="str",
compressor=default_compressor,
chunks=(samples_chunk_size,),
Expand All @@ -100,7 +101,8 @@ def convert(
# fetching repeatedly from bim file
a = root.array(
"variant_position",
bed.bp_position,
data=bed.bp_position,
shape=bed.bp_position.shape,
dtype=np.int32,
compressor=default_compressor,
chunks=(variants_chunk_size,),
Expand All @@ -111,7 +113,8 @@ def convert(
alleles = np.stack([bed.allele_1, bed.allele_2], axis=1)
a = root.array(
"variant_allele",
alleles,
data=alleles,
shape=alleles.shape,
dtype="str",
compressor=default_compressor,
chunks=(variants_chunk_size,),
Expand All @@ -121,31 +124,34 @@ def convert(

# TODO remove this?
a = root.empty(
"call_genotype_phased",
name="call_genotype_phased",
dtype="bool",
shape=list(shape),
chunks=list(chunks),
compressor=default_compressor,
**ZARR_FORMAT_KWARGS,
)
a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions)

shape += [ploidy]
dimensions += ["ploidy"]
a = root.empty(
"call_genotype",
name="call_genotype",
dtype="i1",
shape=list(shape),
chunks=list(chunks),
compressor=default_compressor,
**ZARR_FORMAT_KWARGS,
)
a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions)

a = root.empty(
"call_genotype_mask",
name="call_genotype_mask",
dtype="bool",
shape=list(shape),
chunks=list(chunks),
compressor=default_compressor,
**ZARR_FORMAT_KWARGS,
)
a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions)

Expand All @@ -154,7 +160,7 @@ def convert(
num_slices = max(1, worker_processes * 4)
slices = core.chunk_aligned_slices(a, num_slices)

total_chunks = sum(a.nchunks for a in root.values())
total_chunks = sum(a.nchunks for _, a in root.arrays())

progress_config = core.ProgressConfig(
total=total_chunks, title="Convert", units="chunks", show=show_progress
Expand All @@ -171,8 +177,7 @@ def convert(
# FIXME do this more efficiently - currently reading the whole thing
# in for convenience, and also comparing call-by-call
def validate(bed_path, zarr_path):
store = zarr.DirectoryStore(zarr_path)
root = zarr.group(store=store)
root = zarr.open(store=zarr_path, mode="r")
call_genotype = root["call_genotype"][:]

bed = bed_reader.open_bed(bed_path, count_A1=False, num_threads=1)
Expand Down
30 changes: 16 additions & 14 deletions bio2zarr/vcf2zarr/vcz.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import numpy as np
import zarr

from bio2zarr.zarr_utils import ZARR_FORMAT_KWARGS

from .. import constants, core, provenance
from . import icf

Expand Down Expand Up @@ -532,8 +534,7 @@ def init(
)

self.path.mkdir()
store = zarr.DirectoryStore(self.path)
root = zarr.group(store=store)
root = zarr.open(store=self.path, mode="a", **ZARR_FORMAT_KWARGS)
root.attrs.update(
{
"vcf_zarr_version": "0.2",
Expand All @@ -549,8 +550,7 @@ def init(
self.wip_path.mkdir()
self.arrays_path.mkdir()
self.partitions_path.mkdir()
store = zarr.DirectoryStore(self.arrays_path)
root = zarr.group(store=store)
root = zarr.open(store=self.arrays_path, mode="a", **ZARR_FORMAT_KWARGS)

total_chunks = 0
for field in self.schema.fields:
Expand All @@ -574,7 +574,8 @@ def encode_samples(self, root):
raise ValueError("Subsetting or reordering samples not supported currently")
array = root.array(
"sample_id",
[sample.id for sample in self.schema.samples],
data=[sample.id for sample in self.schema.samples],
shape=len(self.schema.samples),
dtype="str",
compressor=DEFAULT_ZARR_COMPRESSOR,
chunks=(self.schema.samples_chunk_size,),
Expand All @@ -585,15 +586,17 @@ def encode_samples(self, root):
def encode_contig_id(self, root):
array = root.array(
"contig_id",
[contig.id for contig in self.schema.contigs],
data=[contig.id for contig in self.schema.contigs],
shape=len(self.schema.contigs),
dtype="str",
compressor=DEFAULT_ZARR_COMPRESSOR,
)
array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]
if all(contig.length is not None for contig in self.schema.contigs):
array = root.array(
"contig_length",
[contig.length for contig in self.schema.contigs],
data=[contig.length for contig in self.schema.contigs],
shape=len(self.schema.contigs),
dtype=np.int64,
compressor=DEFAULT_ZARR_COMPRESSOR,
)
Expand All @@ -604,7 +607,8 @@ def encode_filter_id(self, root):
# https://github.com/sgkit-dev/vcf-zarr-spec/issues/19
array = root.array(
"filter_id",
[filt.id for filt in self.schema.filters],
data=[filt.id for filt in self.schema.filters],
shape=len(self.schema.filters),
dtype="str",
compressor=DEFAULT_ZARR_COMPRESSOR,
)
Expand All @@ -618,14 +622,15 @@ def init_array(self, root, array_spec, variants_dim_size):
# Truncate the variants dimension is max_variant_chunks was specified
shape[0] = variants_dim_size
a = root.empty(
array_spec.name,
name=array_spec.name,
shape=shape,
chunks=array_spec.chunks,
dtype=array_spec.dtype,
compressor=numcodecs.get_codec(array_spec.compressor),
filters=[numcodecs.get_codec(filt) for filt in array_spec.filters],
object_codec=object_codec,
dimension_separator=self.metadata.dimension_separator,
**ZARR_FORMAT_KWARGS,
)
a.attrs.update(
{
Expand Down Expand Up @@ -690,9 +695,7 @@ def init_partition_array(self, partition_index, name):
# Overwrite any existing WIP files
wip_path = self.wip_partition_array_path(partition_index, name)
shutil.copytree(src, wip_path, dirs_exist_ok=True)
store = zarr.DirectoryStore(self.wip_partition_path(partition_index))
wip_root = zarr.group(store=store)
array = wip_root[name]
array = zarr.open_array(store=wip_path, mode="a")
logger.debug(f"Opened empty array {array.name} <{array.dtype}> @ {wip_path}")
return array

Expand Down Expand Up @@ -909,8 +912,7 @@ def finalise(self, show_progress=False):
def create_index(self):
"""Create an index to support efficient region queries."""

store = zarr.DirectoryStore(self.path)
root = zarr.open_group(store=store, mode="r+")
root = zarr.open_group(store=self.path, mode="r+")

contig = root["variant_contig"]
pos = root["variant_position"]
Expand Down
4 changes: 1 addition & 3 deletions bio2zarr/vcf2zarr/verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ def assert_format_val_equal(vcf_val, zarr_val, vcf_type, vcf_number):


def verify(vcf_path, zarr_path, show_progress=False):
store = zarr.DirectoryStore(zarr_path)

root = zarr.group(store=store)
root = zarr.open(store=zarr_path, mode="r")
pos = root["variant_position"][:]
allele = root["variant_allele"][:]
chrom = root["contig_id"][:][root["variant_contig"][:]]
Expand Down
13 changes: 13 additions & 0 deletions bio2zarr/zarr_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import zarr
from packaging.version import Version


def zarr_v3() -> bool:
return Version(zarr.__version__).major >= 3


if zarr_v3():
# Use zarr format v2 even when running with zarr-python v3
ZARR_FORMAT_KWARGS = dict(zarr_format=2)
else:
ZARR_FORMAT_KWARGS = dict()