diff --git a/pyproject.toml b/pyproject.toml index b2e3a06..cd96d31 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ test = [ "pytest", "pytest-asyncio", "pytest-xdist", + "pytest-mock", ] doc = ["sphinx>=7.4.6", "myst-parser"] dev = [ diff --git a/python/zarrs/_internal.pyi b/python/zarrs/_internal.pyi index ecbf742..8d5e324 100644 --- a/python/zarrs/_internal.pyi +++ b/python/zarrs/_internal.pyi @@ -13,7 +13,8 @@ class Basic: class CodecPipelineImpl: def __new__( cls, - metadata: builtins.str, + array_metadata: builtins.str, + store_config: StoreConfig, *, validate_checksums: builtins.bool | None = None, chunk_concurrent_minimum: builtins.int | None = None, diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index 3705e12..78c64d9 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -2,20 +2,22 @@ import asyncio import json -import re from dataclasses import dataclass from typing import TYPE_CHECKING, TypedDict +from warnings import warn import numpy as np from zarr.abc.codec import Codec, CodecPipeline +from zarr.codecs._v2 import V2Codec from zarr.core import BatchedCodecPipeline from zarr.core.config import config +from zarr.core.metadata import ArrayMetadata, ArrayV2Metadata, ArrayV3Metadata if TYPE_CHECKING: from collections.abc import Generator, Iterable, Iterator from typing import Any, Self - from zarr.abc.store import ByteGetter, ByteSetter + from zarr.abc.store import ByteGetter, ByteSetter, Store from zarr.core.array_spec import ArraySpec from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer from zarr.core.chunk_grids import ChunkGrid @@ -40,10 +42,14 @@ class UnsupportedMetadataError(Exception): pass -def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | None: +def get_codec_pipeline_impl( + metadata: ArrayMetadata, store: Store +) -> CodecPipelineImpl | None: try: + array_metadata_json = json.dumps(metadata.to_dict()) return CodecPipelineImpl( - codec_metadata_json, + array_metadata_json, + store_config=store, validate_checksums=config.get("codec_pipeline.validate_checksums", None), chunk_concurrent_minimum=config.get( "codec_pipeline.chunk_concurrent_minimum", None @@ -54,10 +60,11 @@ def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | Non num_threads=config.get("threading.max_workers", None), ) except TypeError as e: - if re.match(r"codec (delta|zlib) is not supported", str(e)): - return None - else: - raise e + warn( + f"Array is unsupported by ZarrsCodecPipeline: {e}", + category=UserWarning, + ) + return None def codecs_to_dict(codecs: Iterable[Codec]) -> Generator[dict[str, Any], None, None]: @@ -88,37 +95,47 @@ class ZarrsCodecPipelineState(TypedDict): codecs: tuple[Codec, ...] +def array_metadata_to_codecs(metadata: ArrayMetadata) -> list[Codec]: + if isinstance(metadata, ArrayV3Metadata): + return metadata.codecs + elif isinstance(metadata, ArrayV2Metadata): + v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor) + return [v2_codec] + + @dataclass class ZarrsCodecPipeline(CodecPipeline): - codecs: tuple[Codec, ...] + metadata: ArrayMetadata + store: Store impl: CodecPipelineImpl | None - codec_metadata_json: str python_impl: BatchedCodecPipeline def __getstate__(self) -> ZarrsCodecPipelineState: - return {"codec_metadata_json": self.codec_metadata_json, "codecs": self.codecs} + return {"metadata": self.metadata, "store": self.store} def __setstate__(self, state: ZarrsCodecPipelineState): - self.codecs = state["codecs"] - self.codec_metadata_json = state["codec_metadata_json"] - self.impl = get_codec_pipeline_impl(self.codec_metadata_json) - self.python_impl = BatchedCodecPipeline.from_codecs(self.codecs) + self.metadata = state["metadata"] + self.store = state["store"] + self.impl = get_codec_pipeline_impl(self.metadata, self.store) + codecs = array_metadata_to_codecs(self.metadata) + self.python_impl = BatchedCodecPipeline.from_codecs(codecs) def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: - raise NotImplementedError("evolve_from_array_spec") + return self @classmethod def from_codecs(cls, codecs: Iterable[Codec]) -> Self: - codec_metadata = list(codecs_to_dict(codecs)) - codec_metadata_json = json.dumps(codec_metadata) - # TODO: upstream zarr-python has not settled on how to deal with configs yet - # Should they be checked when an array is created, or when an operation is performed? - # https://github.com/zarr-developers/zarr-python/issues/2409 - # https://github.com/zarr-developers/zarr-python/pull/2429#issuecomment-2566976567 + return BatchedCodecPipeline.from_codecs(codecs) + + @classmethod + def from_array_metadata_and_store( + cls, array_metadata: ArrayMetadata, store: Store + ) -> Self: + codecs = array_metadata_to_codecs(array_metadata) return cls( - codec_metadata_json=codec_metadata_json, - codecs=tuple(codecs), - impl=get_codec_pipeline_impl(codec_metadata_json), + metadata=array_metadata, + store=store, + impl=get_codec_pipeline_impl(array_metadata, store), python_impl=BatchedCodecPipeline.from_codecs(codecs), ) diff --git a/src/chunk_item.rs b/src/chunk_item.rs index da0d9f8..c4d592f 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -14,10 +14,9 @@ use zarrs::{ storage::StoreKey, }; -use crate::{store::StoreConfig, utils::PyErrExt}; +use crate::utils::PyErrExt; pub(crate) trait ChunksItem { - fn store_config(&self) -> StoreConfig; fn key(&self) -> &StoreKey; fn representation(&self) -> &ChunkRepresentation; } @@ -26,7 +25,6 @@ pub(crate) trait ChunksItem { #[gen_stub_pyclass] #[pyclass] pub(crate) struct Basic { - store: StoreConfig, key: StoreKey, representation: ChunkRepresentation, } @@ -62,7 +60,6 @@ fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult, chunk_spec: &Bound<'_, PyAny>) -> PyResult { - let store: StoreConfig = byte_interface.getattr("store")?.extract()?; let path: String = byte_interface.getattr("path")?.extract()?; let chunk_shape = chunk_spec.getattr("shape")?.extract()?; @@ -79,7 +76,6 @@ impl Basic { let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?; let fill_value_bytes = fill_value_to_bytes(&dtype, &fill_value)?; Ok(Self { - store, key: StoreKey::new(path).map_py_err::()?, representation: get_chunk_representation(chunk_shape, &dtype, fill_value_bytes)?, }) @@ -118,9 +114,6 @@ impl WithSubset { } impl ChunksItem for Basic { - fn store_config(&self) -> StoreConfig { - self.store.clone() - } fn key(&self) -> &StoreKey { &self.key } @@ -130,9 +123,6 @@ impl ChunksItem for Basic { } impl ChunksItem for WithSubset { - fn store_config(&self) -> StoreConfig { - self.item.store.clone() - } fn key(&self) -> &StoreKey { &self.item.key } diff --git a/src/lib.rs b/src/lib.rs index 543cffb..0fd7553 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,14 +20,15 @@ use unsafe_cell_slice::UnsafeCellSlice; use utils::is_whole_chunk; use zarrs::array::codec::{ ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder, + StoragePartialDecoder, }; use zarrs::array::{ - copy_fill_value_into, update_array_bytes, ArrayBytes, ArrayBytesFixedDisjointView, ArraySize, - CodecChain, FillValue, + copy_fill_value_into, update_array_bytes, Array, ArrayBytes, ArrayBytesFixedDisjointView, + ArrayMetadata, ArraySize, CodecChain, FillValue, }; use zarrs::array_subset::ArraySubset; -use zarrs::metadata::v3::MetadataV3; -use zarrs::storage::StoreKey; +use zarrs::storage::store::MemoryStore; +use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey}; mod chunk_item; mod concurrency; @@ -41,14 +42,14 @@ mod utils; use crate::chunk_item::ChunksItem; use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; use crate::metadata_v2::codec_metadata_v2_to_v3; -use crate::store::StoreManager; +use crate::store::StoreConfig; use crate::utils::{PyErrExt as _, PyUntypedArrayExt as _}; // TODO: Use a OnceLock for store with get_or_try_init when stabilised? #[gen_stub_pyclass] #[pyclass] pub struct CodecPipelineImpl { - pub(crate) stores: StoreManager, + pub(crate) store: ReadableWritableListableStorage, pub(crate) codec_chain: Arc, pub(crate) codec_options: CodecOptions, pub(crate) chunk_concurrent_minimum: usize, @@ -63,7 +64,7 @@ impl CodecPipelineImpl { codec_chain: &CodecChain, codec_options: &CodecOptions, ) -> PyResult> { - let value_encoded = self.stores.get(item)?; + let value_encoded = self.store.get(item.key()).map_py_err::()?; let value_decoded = if let Some(value_encoded) = value_encoded { let value_encoded: Vec = value_encoded.into(); // zero-copy in this case codec_chain @@ -94,7 +95,7 @@ impl CodecPipelineImpl { .map_py_err::()?; if value_decoded.is_fill_value(item.representation().fill_value()) { - self.stores.erase(item) + self.store.erase(item.key()).map_py_err::() } else { let value_encoded = codec_chain .encode(value_decoded, item.representation(), codec_options) @@ -102,7 +103,9 @@ impl CodecPipelineImpl { .map_py_err::()?; // Store the encoded chunk - self.stores.set(item, value_encoded.into()) + self.store + .set(item.key(), value_encoded.into()) + .map_py_err::() } } @@ -204,7 +207,8 @@ impl CodecPipelineImpl { #[pymethods] impl CodecPipelineImpl { #[pyo3(signature = ( - metadata, + array_metadata, + store_config, *, validate_checksums=None, chunk_concurrent_minimum=None, @@ -213,16 +217,21 @@ impl CodecPipelineImpl { ))] #[new] fn new( - metadata: &str, + array_metadata: &str, + store_config: StoreConfig, validate_checksums: Option, chunk_concurrent_minimum: Option, chunk_concurrent_maximum: Option, num_threads: Option, ) -> PyResult { - let metadata: Vec = - serde_json::from_str(metadata).map_py_err::()?; - let codec_chain = - Arc::new(CodecChain::from_metadata(&metadata).map_py_err::()?); + let metadata: ArrayMetadata = + serde_json::from_str(array_metadata).map_py_err::()?; + + // TODO: Add a direct metadata -> codec chain method to zarrs + let store = Arc::new(MemoryStore::new()); + let array = Array::new_with_metadata(store, "/", metadata).map_py_err::()?; + let codec_chain = Arc::new(array.codecs().clone()); + let mut codec_options = CodecOptionsBuilder::new(); if let Some(validate_checksums) = validate_checksums { codec_options = codec_options.validate_checksums(validate_checksums); @@ -235,8 +244,11 @@ impl CodecPipelineImpl { chunk_concurrent_maximum.unwrap_or(rayon::current_num_threads()); let num_threads = num_threads.unwrap_or(rayon::current_num_threads()); + let store: ReadableWritableListableStorage = + (&store_config).try_into().map_py_err::()?; + Ok(Self { - stores: StoreManager::default(), + store, codec_chain, codec_options, chunk_concurrent_minimum, @@ -276,7 +288,9 @@ impl CodecPipelineImpl { partial_chunk_descriptions, map, |item| { - let input_handle = self.stores.decoder(item)?; + let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); + let input_handle = + StoragePartialDecoder::new(storage_handle, item.key().clone()); let partial_decoder = self .codec_chain .clone() @@ -326,7 +340,9 @@ impl CodecPipelineImpl { && chunk_subset.shape() == item.representation().shape_u64() { // See zarrs::array::Array::retrieve_chunk_into - if let Some(chunk_encoded) = self.stores.get(&item)? { + if let Some(chunk_encoded) = + self.store.get(item.key()).map_py_err::()? + { // Decode the encoded data into the output buffer let chunk_encoded: Vec = chunk_encoded.into(); self.codec_chain.decode_into( diff --git a/src/store.rs b/src/store.rs index 8fc3ee6..8c6b206 100644 --- a/src/store.rs +++ b/src/store.rs @@ -15,13 +15,11 @@ use crate::{runtime::tokio_block_on, utils::PyErrExt}; mod filesystem; mod http; -mod manager; pub use self::filesystem::FilesystemStoreConfig; pub use self::http::HttpStoreConfig; -pub(crate) use self::manager::StoreManager; -#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone)] #[gen_stub_pyclass_enum] pub enum StoreConfig { Filesystem(FilesystemStoreConfig), diff --git a/src/store/filesystem.rs b/src/store/filesystem.rs index 8ee865b..9bf5b16 100644 --- a/src/store/filesystem.rs +++ b/src/store/filesystem.rs @@ -6,7 +6,7 @@ use zarrs::{filesystem::FilesystemStore, storage::ReadableWritableListableStorag use crate::utils::PyErrExt; -#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone)] #[gen_stub_pyclass] #[pyclass] pub struct FilesystemStoreConfig { diff --git a/src/store/http.rs b/src/store/http.rs index 0c7820b..725d5a4 100644 --- a/src/store/http.rs +++ b/src/store/http.rs @@ -6,7 +6,7 @@ use zarrs::storage::ReadableWritableListableStorage; use super::opendal_builder_to_sync_store; -#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone)] #[gen_stub_pyclass] #[pyclass] pub struct HttpStoreConfig { diff --git a/src/store/manager.rs b/src/store/manager.rs deleted file mode 100644 index d32a68d..0000000 --- a/src/store/manager.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::{ - collections::BTreeMap, - sync::{Arc, Mutex}, -}; - -use pyo3::{exceptions::PyRuntimeError, PyResult}; -use zarrs::{ - array::codec::StoragePartialDecoder, - storage::{Bytes, MaybeBytes, ReadableWritableListableStorage, StorageHandle}, -}; - -use crate::{chunk_item::ChunksItem, store::PyErrExt as _}; - -use super::StoreConfig; - -#[derive(Default)] -pub(crate) struct StoreManager(Mutex>); - -impl StoreManager { - fn store(&self, item: &I) -> PyResult { - use std::collections::btree_map::Entry::{Occupied, Vacant}; - match self - .0 - .lock() - .map_py_err::()? - .entry(item.store_config()) - { - Occupied(e) => Ok(e.get().clone()), - Vacant(e) => Ok(e.insert((&item.store_config()).try_into()?).clone()), - } - } - - pub(crate) fn get(&self, item: &I) -> PyResult { - self.store(item)? - .get(item.key()) - .map_py_err::() - } - - pub(crate) fn set(&self, item: &I, value: Bytes) -> PyResult<()> { - self.store(item)? - .set(item.key(), value) - .map_py_err::() - } - - pub(crate) fn erase(&self, item: &I) -> PyResult<()> { - self.store(item)? - .erase(item.key()) - .map_py_err::() - } - - pub(crate) fn decoder(&self, item: &I) -> PyResult { - // Partially decode the chunk into the output buffer - let storage_handle = Arc::new(StorageHandle::new(self.store(item)?)); - // NOTE: Normally a storage transformer would exist between the storage handle and the input handle - // but zarr-python does not support them nor forward them to the codec pipeline - Ok(StoragePartialDecoder::new( - storage_handle, - item.key().clone(), - )) - } -} diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 70d5ffc..215fc15 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -256,3 +256,28 @@ def test_roundtrip_read_only_zarrs( assert np.all( res == store_values, ), res + + +@pytest.mark.parametrize( + "codec", + [zarr.codecs.BloscCodec(), zarr.codecs.GzipCodec(), zarr.codecs.ZstdCodec()], +) +@pytest.mark.parametrize("should_shard", [True, False]) +def test_pipeline_used( + mocker, codec: zarr.abc.codec.BaseCodec, tmp_path: Path, *, should_shard: bool +): + z = zarr.create_array( + tmp_path / "foo.zarr", + dtype=np.uint16, + shape=(80, 100), + chunks=(10, 10), + shards=(20, 20) if should_shard else None, + compressors=[codec], + ) + spy_read = mocker.spy(z._async_array.codec_pipeline, "read") + spy_write = mocker.spy(z._async_array.codec_pipeline, "write") + assert isinstance(z._async_array.codec_pipeline, zarrs.ZarrsCodecPipeline) + z[...] = np.random.random(z.shape) + z[...] + assert spy_read.call_count == 1 + assert spy_write.call_count == 1 diff --git a/tests/test_v2.py b/tests/test_v2.py index b5eb279..43de1d0 100644 --- a/tests/test_v2.py +++ b/tests/test_v2.py @@ -42,6 +42,9 @@ def test_simple(store: StorePath) -> None: assert np.array_equal(data, a[:, :]) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. codec delta is not supported:UserWarning" +) def test_fill_single_value(store: Store) -> None: array = zarr.create( store=store, @@ -57,6 +60,13 @@ def test_fill_single_value(store: Store) -> None: np.testing.assert_array_equal(result, expected) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. data type |S1 is not supported:UserWarning" +) +@pytest.mark.filterwarnings( + # TODO: Fix handling of string fill values for Zarr v2 bytes data + "ignore:Array is unsupported by ZarrsCodecPipeline. incompatible fill value ..+. for data type bytes:UserWarning" +) @pytest.mark.parametrize( ("dtype", "expected_dtype", "fill_value", "fill_value_json"), [ @@ -100,6 +110,12 @@ async def test_v2_encode_decode( np.testing.assert_equal(data, np.full((3,), b"X", dtype=dtype)) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. data type |U1 is not supported:UserWarning" +) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. data type |S1 is not supported:UserWarning" +) @pytest.mark.parametrize( ("dtype", "value"), [ @@ -108,9 +124,7 @@ async def test_v2_encode_decode( (VariableLengthUTF8(), "Y"), ], ) -def test_v2_encode_decode_with_data( - dtype: ZDType[Any, Any], value: str, tmp_path: Path -): +def test_v2_encode_decode_with_data(dtype: ZDType[Any, Any], value: str, tmp_path): expected = np.full((3,), value, dtype=dtype.to_native_dtype()) a = zarr.create( store=tmp_path, @@ -123,6 +137,9 @@ def test_v2_encode_decode_with_data( np.testing.assert_equal(data, expected) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. codec delta is not supported:UserWarning" +) @pytest.mark.parametrize( "filters", [[], [numcodecs.Delta(dtype=" None: a = np.array( @@ -280,6 +301,14 @@ def test_parse_structured_fill_value_valid( assert result[name] == expected_result[name] +@pytest.mark.filterwarnings( + # TODO: Permit this in zarrs? + "ignore:Array is unsupported by ZarrsCodecPipeline. unsupported Zarr V2 array. unsupported fill value Null for data type bytes:UserWarning" +) +@pytest.mark.filterwarnings( + # TODO: Fix handling of string fill values for Zarr v2 bytes data + "ignore:Array is unsupported by ZarrsCodecPipeline. incompatible fill value .eAAAAAAAAA==. for data type bytes:UserWarning" +) @pytest.mark.parametrize("fill_value", [None, b"x"], ids=["no_fill", "fill"]) def test_other_dtype_roundtrip(fill_value, tmp_path) -> None: a = np.array([b"a\0\0", b"bb", b"ccc"], dtype="V7") diff --git a/tests/test_vlen.py b/tests/test_vlen.py index b9e29eb..2c4ffaa 100644 --- a/tests/test_vlen.py +++ b/tests/test_vlen.py @@ -28,6 +28,15 @@ expected_array_string_dtype = np.dtype("O") +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. data type fixed_length_utf32 is not supported:UserWarning" +) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. data type null_terminated_bytes is not supported:UserWarning" +) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. data type fixed_length_utf32 is not supported:UserWarning" +) @pytest.mark.filterwarnings( "ignore::zarr.core.dtype.common.UnstableSpecificationWarning" )