Skip to content

Commit 53c2224

Browse files
LDeakinilan-gold
andauthored
feat: initialise from ArrayMetadata and store (#104)
* (chore): handle new zarr dtype API * (fix): dep in pyproject toml for zarr from main * (fix): use local store for v2 * fix: don't use `default_filters`/`default_compressor` * feat: initialise from `ArrayMetadata` and store * Apply suggestion from @ilan-gold Co-authored-by: Ilan Gold <[email protected]> * chore: add spy for pipeline (#105) --------- Co-authored-by: ilan-gold <[email protected]>
1 parent e7e88ed commit 53c2224

File tree

12 files changed

+149
-124
lines changed

12 files changed

+149
-124
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ test = [
3131
"pytest",
3232
"pytest-asyncio",
3333
"pytest-xdist",
34+
"pytest-mock",
3435
]
3536
doc = ["sphinx>=7.4.6", "myst-parser"]
3637
dev = [

python/zarrs/_internal.pyi

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ class Basic:
1313
class CodecPipelineImpl:
1414
def __new__(
1515
cls,
16-
metadata: builtins.str,
16+
array_metadata: builtins.str,
17+
store_config: StoreConfig,
1718
*,
1819
validate_checksums: builtins.bool | None = None,
1920
chunk_concurrent_minimum: builtins.int | None = None,

python/zarrs/pipeline.py

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,22 @@
22

33
import asyncio
44
import json
5-
import re
65
from dataclasses import dataclass
76
from typing import TYPE_CHECKING, TypedDict
7+
from warnings import warn
88

99
import numpy as np
1010
from zarr.abc.codec import Codec, CodecPipeline
11+
from zarr.codecs._v2 import V2Codec
1112
from zarr.core import BatchedCodecPipeline
1213
from zarr.core.config import config
14+
from zarr.core.metadata import ArrayMetadata, ArrayV2Metadata, ArrayV3Metadata
1315

1416
if TYPE_CHECKING:
1517
from collections.abc import Generator, Iterable, Iterator
1618
from typing import Any, Self
1719

18-
from zarr.abc.store import ByteGetter, ByteSetter
20+
from zarr.abc.store import ByteGetter, ByteSetter, Store
1921
from zarr.core.array_spec import ArraySpec
2022
from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer
2123
from zarr.core.chunk_grids import ChunkGrid
@@ -40,10 +42,14 @@ class UnsupportedMetadataError(Exception):
4042
pass
4143

4244

43-
def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | None:
45+
def get_codec_pipeline_impl(
46+
metadata: ArrayMetadata, store: Store
47+
) -> CodecPipelineImpl | None:
4448
try:
49+
array_metadata_json = json.dumps(metadata.to_dict())
4550
return CodecPipelineImpl(
46-
codec_metadata_json,
51+
array_metadata_json,
52+
store_config=store,
4753
validate_checksums=config.get("codec_pipeline.validate_checksums", None),
4854
chunk_concurrent_minimum=config.get(
4955
"codec_pipeline.chunk_concurrent_minimum", None
@@ -54,10 +60,11 @@ def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | Non
5460
num_threads=config.get("threading.max_workers", None),
5561
)
5662
except TypeError as e:
57-
if re.match(r"codec (delta|zlib) is not supported", str(e)):
58-
return None
59-
else:
60-
raise e
63+
warn(
64+
f"Array is unsupported by ZarrsCodecPipeline: {e}",
65+
category=UserWarning,
66+
)
67+
return None
6168

6269

6370
def codecs_to_dict(codecs: Iterable[Codec]) -> Generator[dict[str, Any], None, None]:
@@ -88,37 +95,47 @@ class ZarrsCodecPipelineState(TypedDict):
8895
codecs: tuple[Codec, ...]
8996

9097

98+
def array_metadata_to_codecs(metadata: ArrayMetadata) -> list[Codec]:
99+
if isinstance(metadata, ArrayV3Metadata):
100+
return metadata.codecs
101+
elif isinstance(metadata, ArrayV2Metadata):
102+
v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor)
103+
return [v2_codec]
104+
105+
91106
@dataclass
92107
class ZarrsCodecPipeline(CodecPipeline):
93-
codecs: tuple[Codec, ...]
108+
metadata: ArrayMetadata
109+
store: Store
94110
impl: CodecPipelineImpl | None
95-
codec_metadata_json: str
96111
python_impl: BatchedCodecPipeline
97112

98113
def __getstate__(self) -> ZarrsCodecPipelineState:
99-
return {"codec_metadata_json": self.codec_metadata_json, "codecs": self.codecs}
114+
return {"metadata": self.metadata, "store": self.store}
100115

101116
def __setstate__(self, state: ZarrsCodecPipelineState):
102-
self.codecs = state["codecs"]
103-
self.codec_metadata_json = state["codec_metadata_json"]
104-
self.impl = get_codec_pipeline_impl(self.codec_metadata_json)
105-
self.python_impl = BatchedCodecPipeline.from_codecs(self.codecs)
117+
self.metadata = state["metadata"]
118+
self.store = state["store"]
119+
self.impl = get_codec_pipeline_impl(self.metadata, self.store)
120+
codecs = array_metadata_to_codecs(self.metadata)
121+
self.python_impl = BatchedCodecPipeline.from_codecs(codecs)
106122

107123
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
108-
raise NotImplementedError("evolve_from_array_spec")
124+
return self
109125

110126
@classmethod
111127
def from_codecs(cls, codecs: Iterable[Codec]) -> Self:
112-
codec_metadata = list(codecs_to_dict(codecs))
113-
codec_metadata_json = json.dumps(codec_metadata)
114-
# TODO: upstream zarr-python has not settled on how to deal with configs yet
115-
# Should they be checked when an array is created, or when an operation is performed?
116-
# https://github.com/zarr-developers/zarr-python/issues/2409
117-
# https://github.com/zarr-developers/zarr-python/pull/2429#issuecomment-2566976567
128+
return BatchedCodecPipeline.from_codecs(codecs)
129+
130+
@classmethod
131+
def from_array_metadata_and_store(
132+
cls, array_metadata: ArrayMetadata, store: Store
133+
) -> Self:
134+
codecs = array_metadata_to_codecs(array_metadata)
118135
return cls(
119-
codec_metadata_json=codec_metadata_json,
120-
codecs=tuple(codecs),
121-
impl=get_codec_pipeline_impl(codec_metadata_json),
136+
metadata=array_metadata,
137+
store=store,
138+
impl=get_codec_pipeline_impl(array_metadata, store),
122139
python_impl=BatchedCodecPipeline.from_codecs(codecs),
123140
)
124141

src/chunk_item.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@ use zarrs::{
1414
storage::StoreKey,
1515
};
1616

17-
use crate::{store::StoreConfig, utils::PyErrExt};
17+
use crate::utils::PyErrExt;
1818

1919
pub(crate) trait ChunksItem {
20-
fn store_config(&self) -> StoreConfig;
2120
fn key(&self) -> &StoreKey;
2221
fn representation(&self) -> &ChunkRepresentation;
2322
}
@@ -26,7 +25,6 @@ pub(crate) trait ChunksItem {
2625
#[gen_stub_pyclass]
2726
#[pyclass]
2827
pub(crate) struct Basic {
29-
store: StoreConfig,
3028
key: StoreKey,
3129
representation: ChunkRepresentation,
3230
}
@@ -62,7 +60,6 @@ fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult<V
6260
impl Basic {
6361
#[new]
6462
fn new(byte_interface: &Bound<'_, PyAny>, chunk_spec: &Bound<'_, PyAny>) -> PyResult<Self> {
65-
let store: StoreConfig = byte_interface.getattr("store")?.extract()?;
6663
let path: String = byte_interface.getattr("path")?.extract()?;
6764

6865
let chunk_shape = chunk_spec.getattr("shape")?.extract()?;
@@ -79,7 +76,6 @@ impl Basic {
7976
let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?;
8077
let fill_value_bytes = fill_value_to_bytes(&dtype, &fill_value)?;
8178
Ok(Self {
82-
store,
8379
key: StoreKey::new(path).map_py_err::<PyValueError>()?,
8480
representation: get_chunk_representation(chunk_shape, &dtype, fill_value_bytes)?,
8581
})
@@ -118,9 +114,6 @@ impl WithSubset {
118114
}
119115

120116
impl ChunksItem for Basic {
121-
fn store_config(&self) -> StoreConfig {
122-
self.store.clone()
123-
}
124117
fn key(&self) -> &StoreKey {
125118
&self.key
126119
}
@@ -130,9 +123,6 @@ impl ChunksItem for Basic {
130123
}
131124

132125
impl ChunksItem for WithSubset {
133-
fn store_config(&self) -> StoreConfig {
134-
self.item.store.clone()
135-
}
136126
fn key(&self) -> &StoreKey {
137127
&self.item.key
138128
}

src/lib.rs

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ use unsafe_cell_slice::UnsafeCellSlice;
2020
use utils::is_whole_chunk;
2121
use zarrs::array::codec::{
2222
ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder,
23+
StoragePartialDecoder,
2324
};
2425
use zarrs::array::{
25-
copy_fill_value_into, update_array_bytes, ArrayBytes, ArrayBytesFixedDisjointView, ArraySize,
26-
CodecChain, FillValue,
26+
copy_fill_value_into, update_array_bytes, Array, ArrayBytes, ArrayBytesFixedDisjointView,
27+
ArrayMetadata, ArraySize, CodecChain, FillValue,
2728
};
2829
use zarrs::array_subset::ArraySubset;
29-
use zarrs::metadata::v3::MetadataV3;
30-
use zarrs::storage::StoreKey;
30+
use zarrs::storage::store::MemoryStore;
31+
use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey};
3132

3233
mod chunk_item;
3334
mod concurrency;
@@ -41,14 +42,14 @@ mod utils;
4142
use crate::chunk_item::ChunksItem;
4243
use crate::concurrency::ChunkConcurrentLimitAndCodecOptions;
4344
use crate::metadata_v2::codec_metadata_v2_to_v3;
44-
use crate::store::StoreManager;
45+
use crate::store::StoreConfig;
4546
use crate::utils::{PyErrExt as _, PyUntypedArrayExt as _};
4647

4748
// TODO: Use a OnceLock for store with get_or_try_init when stabilised?
4849
#[gen_stub_pyclass]
4950
#[pyclass]
5051
pub struct CodecPipelineImpl {
51-
pub(crate) stores: StoreManager,
52+
pub(crate) store: ReadableWritableListableStorage,
5253
pub(crate) codec_chain: Arc<CodecChain>,
5354
pub(crate) codec_options: CodecOptions,
5455
pub(crate) chunk_concurrent_minimum: usize,
@@ -63,7 +64,7 @@ impl CodecPipelineImpl {
6364
codec_chain: &CodecChain,
6465
codec_options: &CodecOptions,
6566
) -> PyResult<ArrayBytes<'a>> {
66-
let value_encoded = self.stores.get(item)?;
67+
let value_encoded = self.store.get(item.key()).map_py_err::<PyRuntimeError>()?;
6768
let value_decoded = if let Some(value_encoded) = value_encoded {
6869
let value_encoded: Vec<u8> = value_encoded.into(); // zero-copy in this case
6970
codec_chain
@@ -94,15 +95,17 @@ impl CodecPipelineImpl {
9495
.map_py_err::<PyValueError>()?;
9596

9697
if value_decoded.is_fill_value(item.representation().fill_value()) {
97-
self.stores.erase(item)
98+
self.store.erase(item.key()).map_py_err::<PyRuntimeError>()
9899
} else {
99100
let value_encoded = codec_chain
100101
.encode(value_decoded, item.representation(), codec_options)
101102
.map(Cow::into_owned)
102103
.map_py_err::<PyRuntimeError>()?;
103104

104105
// Store the encoded chunk
105-
self.stores.set(item, value_encoded.into())
106+
self.store
107+
.set(item.key(), value_encoded.into())
108+
.map_py_err::<PyRuntimeError>()
106109
}
107110
}
108111

@@ -204,7 +207,8 @@ impl CodecPipelineImpl {
204207
#[pymethods]
205208
impl CodecPipelineImpl {
206209
#[pyo3(signature = (
207-
metadata,
210+
array_metadata,
211+
store_config,
208212
*,
209213
validate_checksums=None,
210214
chunk_concurrent_minimum=None,
@@ -213,16 +217,21 @@ impl CodecPipelineImpl {
213217
))]
214218
#[new]
215219
fn new(
216-
metadata: &str,
220+
array_metadata: &str,
221+
store_config: StoreConfig,
217222
validate_checksums: Option<bool>,
218223
chunk_concurrent_minimum: Option<usize>,
219224
chunk_concurrent_maximum: Option<usize>,
220225
num_threads: Option<usize>,
221226
) -> PyResult<Self> {
222-
let metadata: Vec<MetadataV3> =
223-
serde_json::from_str(metadata).map_py_err::<PyTypeError>()?;
224-
let codec_chain =
225-
Arc::new(CodecChain::from_metadata(&metadata).map_py_err::<PyTypeError>()?);
227+
let metadata: ArrayMetadata =
228+
serde_json::from_str(array_metadata).map_py_err::<PyTypeError>()?;
229+
230+
// TODO: Add a direct metadata -> codec chain method to zarrs
231+
let store = Arc::new(MemoryStore::new());
232+
let array = Array::new_with_metadata(store, "/", metadata).map_py_err::<PyTypeError>()?;
233+
let codec_chain = Arc::new(array.codecs().clone());
234+
226235
let mut codec_options = CodecOptionsBuilder::new();
227236
if let Some(validate_checksums) = validate_checksums {
228237
codec_options = codec_options.validate_checksums(validate_checksums);
@@ -235,8 +244,11 @@ impl CodecPipelineImpl {
235244
chunk_concurrent_maximum.unwrap_or(rayon::current_num_threads());
236245
let num_threads = num_threads.unwrap_or(rayon::current_num_threads());
237246

247+
let store: ReadableWritableListableStorage =
248+
(&store_config).try_into().map_py_err::<PyTypeError>()?;
249+
238250
Ok(Self {
239-
stores: StoreManager::default(),
251+
store,
240252
codec_chain,
241253
codec_options,
242254
chunk_concurrent_minimum,
@@ -276,7 +288,9 @@ impl CodecPipelineImpl {
276288
partial_chunk_descriptions,
277289
map,
278290
|item| {
279-
let input_handle = self.stores.decoder(item)?;
291+
let storage_handle = Arc::new(StorageHandle::new(self.store.clone()));
292+
let input_handle =
293+
StoragePartialDecoder::new(storage_handle, item.key().clone());
280294
let partial_decoder = self
281295
.codec_chain
282296
.clone()
@@ -326,7 +340,9 @@ impl CodecPipelineImpl {
326340
&& chunk_subset.shape() == item.representation().shape_u64()
327341
{
328342
// See zarrs::array::Array::retrieve_chunk_into
329-
if let Some(chunk_encoded) = self.stores.get(&item)? {
343+
if let Some(chunk_encoded) =
344+
self.store.get(item.key()).map_py_err::<PyRuntimeError>()?
345+
{
330346
// Decode the encoded data into the output buffer
331347
let chunk_encoded: Vec<u8> = chunk_encoded.into();
332348
self.codec_chain.decode_into(

src/store.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,11 @@ use crate::{runtime::tokio_block_on, utils::PyErrExt};
1515

1616
mod filesystem;
1717
mod http;
18-
mod manager;
1918

2019
pub use self::filesystem::FilesystemStoreConfig;
2120
pub use self::http::HttpStoreConfig;
22-
pub(crate) use self::manager::StoreManager;
2321

24-
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
22+
#[derive(Debug, Clone)]
2523
#[gen_stub_pyclass_enum]
2624
pub enum StoreConfig {
2725
Filesystem(FilesystemStoreConfig),

src/store/filesystem.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use zarrs::{filesystem::FilesystemStore, storage::ReadableWritableListableStorag
66

77
use crate::utils::PyErrExt;
88

9-
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
9+
#[derive(Debug, Clone)]
1010
#[gen_stub_pyclass]
1111
#[pyclass]
1212
pub struct FilesystemStoreConfig {

src/store/http.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use zarrs::storage::ReadableWritableListableStorage;
66

77
use super::opendal_builder_to_sync_store;
88

9-
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
9+
#[derive(Debug, Clone)]
1010
#[gen_stub_pyclass]
1111
#[pyclass]
1212
pub struct HttpStoreConfig {

0 commit comments

Comments
 (0)