diff --git a/bio2zarr/plink.py b/bio2zarr/plink.py index f558ebde..88843db1 100644 --- a/bio2zarr/plink.py +++ b/bio2zarr/plink.py @@ -6,6 +6,8 @@ import numpy as np import zarr +from bio2zarr.zarr_utils import ZARR_FORMAT_KWARGS + from . import core logger = logging.getLogger(__name__) @@ -17,8 +19,7 @@ def encode_genotypes_slice(bed_path, zarr_path, start, stop): # the correct approach is, but it is important to note that the # 0th allele is *not* necessarily the REF for these datasets. bed = bed_reader.open_bed(bed_path, num_threads=1, count_A1=False) - store = zarr.DirectoryStore(zarr_path) - root = zarr.group(store=store) + root = zarr.open(store=zarr_path, mode="a", **ZARR_FORMAT_KWARGS) gt = core.BufferedArray(root["call_genotype"], start) gt_mask = core.BufferedArray(root["call_genotype_mask"], start) gt_phased = core.BufferedArray(root["call_genotype_phased"], start) @@ -73,8 +74,7 @@ def convert( if variants_chunk_size is None: variants_chunk_size = 10_000 - store = zarr.DirectoryStore(zarr_path) - root = zarr.group(store=store, overwrite=True) + root = zarr.open_group(store=zarr_path, mode="w", **ZARR_FORMAT_KWARGS) ploidy = 2 shape = [m, n] @@ -88,7 +88,8 @@ def convert( a = root.array( "sample_id", - bed.iid, + data=bed.iid, + shape=bed.iid.shape, dtype="str", compressor=default_compressor, chunks=(samples_chunk_size,), @@ -100,7 +101,8 @@ def convert( # fetching repeatedly from bim file a = root.array( "variant_position", - bed.bp_position, + data=bed.bp_position, + shape=bed.bp_position.shape, dtype=np.int32, compressor=default_compressor, chunks=(variants_chunk_size,), @@ -111,7 +113,8 @@ def convert( alleles = np.stack([bed.allele_1, bed.allele_2], axis=1) a = root.array( "variant_allele", - alleles, + data=alleles, + shape=alleles.shape, dtype="str", compressor=default_compressor, chunks=(variants_chunk_size,), @@ -121,31 +124,34 @@ def convert( # TODO remove this? a = root.empty( - "call_genotype_phased", + name="call_genotype_phased", dtype="bool", shape=list(shape), chunks=list(chunks), compressor=default_compressor, + **ZARR_FORMAT_KWARGS, ) a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions) shape += [ploidy] dimensions += ["ploidy"] a = root.empty( - "call_genotype", + name="call_genotype", dtype="i1", shape=list(shape), chunks=list(chunks), compressor=default_compressor, + **ZARR_FORMAT_KWARGS, ) a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions) a = root.empty( - "call_genotype_mask", + name="call_genotype_mask", dtype="bool", shape=list(shape), chunks=list(chunks), compressor=default_compressor, + **ZARR_FORMAT_KWARGS, ) a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions) @@ -154,7 +160,7 @@ def convert( num_slices = max(1, worker_processes * 4) slices = core.chunk_aligned_slices(a, num_slices) - total_chunks = sum(a.nchunks for a in root.values()) + total_chunks = sum(a.nchunks for _, a in root.arrays()) progress_config = core.ProgressConfig( total=total_chunks, title="Convert", units="chunks", show=show_progress @@ -171,8 +177,7 @@ def convert( # FIXME do this more efficiently - currently reading the whole thing # in for convenience, and also comparing call-by-call def validate(bed_path, zarr_path): - store = zarr.DirectoryStore(zarr_path) - root = zarr.group(store=store) + root = zarr.open(store=zarr_path, mode="r") call_genotype = root["call_genotype"][:] bed = bed_reader.open_bed(bed_path, count_A1=False, num_threads=1) diff --git a/bio2zarr/vcf2zarr/vcz.py b/bio2zarr/vcf2zarr/vcz.py index 4a502fa8..e776afba 100644 --- a/bio2zarr/vcf2zarr/vcz.py +++ b/bio2zarr/vcf2zarr/vcz.py @@ -13,6 +13,8 @@ import numpy as np import zarr +from bio2zarr.zarr_utils import ZARR_FORMAT_KWARGS + from .. import constants, core, provenance from . import icf @@ -532,8 +534,7 @@ def init( ) self.path.mkdir() - store = zarr.DirectoryStore(self.path) - root = zarr.group(store=store) + root = zarr.open(store=self.path, mode="a", **ZARR_FORMAT_KWARGS) root.attrs.update( { "vcf_zarr_version": "0.2", @@ -549,8 +550,7 @@ def init( self.wip_path.mkdir() self.arrays_path.mkdir() self.partitions_path.mkdir() - store = zarr.DirectoryStore(self.arrays_path) - root = zarr.group(store=store) + root = zarr.open(store=self.arrays_path, mode="a", **ZARR_FORMAT_KWARGS) total_chunks = 0 for field in self.schema.fields: @@ -574,7 +574,8 @@ def encode_samples(self, root): raise ValueError("Subsetting or reordering samples not supported currently") array = root.array( "sample_id", - [sample.id for sample in self.schema.samples], + data=[sample.id for sample in self.schema.samples], + shape=len(self.schema.samples), dtype="str", compressor=DEFAULT_ZARR_COMPRESSOR, chunks=(self.schema.samples_chunk_size,), @@ -585,7 +586,8 @@ def encode_samples(self, root): def encode_contig_id(self, root): array = root.array( "contig_id", - [contig.id for contig in self.schema.contigs], + data=[contig.id for contig in self.schema.contigs], + shape=len(self.schema.contigs), dtype="str", compressor=DEFAULT_ZARR_COMPRESSOR, ) @@ -593,7 +595,8 @@ def encode_contig_id(self, root): if all(contig.length is not None for contig in self.schema.contigs): array = root.array( "contig_length", - [contig.length for contig in self.schema.contigs], + data=[contig.length for contig in self.schema.contigs], + shape=len(self.schema.contigs), dtype=np.int64, compressor=DEFAULT_ZARR_COMPRESSOR, ) @@ -604,7 +607,8 @@ def encode_filter_id(self, root): # https://github.com/sgkit-dev/vcf-zarr-spec/issues/19 array = root.array( "filter_id", - [filt.id for filt in self.schema.filters], + data=[filt.id for filt in self.schema.filters], + shape=len(self.schema.filters), dtype="str", compressor=DEFAULT_ZARR_COMPRESSOR, ) @@ -618,7 +622,7 @@ def init_array(self, root, array_spec, variants_dim_size): # Truncate the variants dimension is max_variant_chunks was specified shape[0] = variants_dim_size a = root.empty( - array_spec.name, + name=array_spec.name, shape=shape, chunks=array_spec.chunks, dtype=array_spec.dtype, @@ -626,6 +630,7 @@ def init_array(self, root, array_spec, variants_dim_size): filters=[numcodecs.get_codec(filt) for filt in array_spec.filters], object_codec=object_codec, dimension_separator=self.metadata.dimension_separator, + **ZARR_FORMAT_KWARGS, ) a.attrs.update( { @@ -690,9 +695,7 @@ def init_partition_array(self, partition_index, name): # Overwrite any existing WIP files wip_path = self.wip_partition_array_path(partition_index, name) shutil.copytree(src, wip_path, dirs_exist_ok=True) - store = zarr.DirectoryStore(self.wip_partition_path(partition_index)) - wip_root = zarr.group(store=store) - array = wip_root[name] + array = zarr.open_array(store=wip_path, mode="a") logger.debug(f"Opened empty array {array.name} <{array.dtype}> @ {wip_path}") return array @@ -909,8 +912,7 @@ def finalise(self, show_progress=False): def create_index(self): """Create an index to support efficient region queries.""" - store = zarr.DirectoryStore(self.path) - root = zarr.open_group(store=store, mode="r+") + root = zarr.open_group(store=self.path, mode="r+") contig = root["variant_contig"] pos = root["variant_position"] diff --git a/bio2zarr/vcf2zarr/verification.py b/bio2zarr/vcf2zarr/verification.py index 27e86fe5..7d1d91c1 100644 --- a/bio2zarr/vcf2zarr/verification.py +++ b/bio2zarr/vcf2zarr/verification.py @@ -145,9 +145,7 @@ def assert_format_val_equal(vcf_val, zarr_val, vcf_type, vcf_number): def verify(vcf_path, zarr_path, show_progress=False): - store = zarr.DirectoryStore(zarr_path) - - root = zarr.group(store=store) + root = zarr.open(store=zarr_path, mode="r") pos = root["variant_position"][:] allele = root["variant_allele"][:] chrom = root["contig_id"][:][root["variant_contig"][:]] diff --git a/bio2zarr/zarr_utils.py b/bio2zarr/zarr_utils.py new file mode 100644 index 00000000..16d36e02 --- /dev/null +++ b/bio2zarr/zarr_utils.py @@ -0,0 +1,13 @@ +import zarr +from packaging.version import Version + + +def zarr_v3() -> bool: + return Version(zarr.__version__).major >= 3 + + +if zarr_v3(): + # Use zarr format v2 even when running with zarr-python v3 + ZARR_FORMAT_KWARGS = dict(zarr_format=2) +else: + ZARR_FORMAT_KWARGS = dict()