Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ test = [
"pytest",
"pytest-asyncio",
"pytest-xdist",
"pytest-mock",
]
doc = ["sphinx>=7.4.6", "myst-parser"]
dev = [
Expand Down
3 changes: 2 additions & 1 deletion python/zarrs/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class Basic:
class CodecPipelineImpl:
def __new__(
cls,
metadata: builtins.str,
array_metadata: builtins.str,
store_config: StoreConfig,
*,
validate_checksums: builtins.bool | None = None,
chunk_concurrent_minimum: builtins.int | None = None,
Expand Down
67 changes: 42 additions & 25 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@

import asyncio
import json
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, TypedDict
from warnings import warn

import numpy as np
from zarr.abc.codec import Codec, CodecPipeline
from zarr.codecs._v2 import V2Codec
from zarr.core import BatchedCodecPipeline
from zarr.core.config import config
from zarr.core.metadata import ArrayMetadata, ArrayV2Metadata, ArrayV3Metadata

if TYPE_CHECKING:
from collections.abc import Generator, Iterable, Iterator
from typing import Any, Self

from zarr.abc.store import ByteGetter, ByteSetter
from zarr.abc.store import ByteGetter, ByteSetter, Store
from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer
from zarr.core.chunk_grids import ChunkGrid
Expand All @@ -40,10 +42,14 @@ class UnsupportedMetadataError(Exception):
pass


def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | None:
def get_codec_pipeline_impl(
metadata: ArrayMetadata, store: Store
) -> CodecPipelineImpl | None:
try:
array_metadata_json = json.dumps(metadata.to_dict())
return CodecPipelineImpl(
codec_metadata_json,
array_metadata_json,
store_config=store,
validate_checksums=config.get("codec_pipeline.validate_checksums", None),
chunk_concurrent_minimum=config.get(
"codec_pipeline.chunk_concurrent_minimum", None
Expand All @@ -54,10 +60,11 @@ def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | Non
num_threads=config.get("threading.max_workers", None),
)
except TypeError as e:
if re.match(r"codec (delta|zlib) is not supported", str(e)):
return None
else:
raise e
warn(
f"Array is unsupported by ZarrsCodecPipeline: {e}",
category=UserWarning,
)
return None


def codecs_to_dict(codecs: Iterable[Codec]) -> Generator[dict[str, Any], None, None]:
Expand Down Expand Up @@ -88,37 +95,47 @@ class ZarrsCodecPipelineState(TypedDict):
codecs: tuple[Codec, ...]


def array_metadata_to_codecs(metadata: ArrayMetadata) -> list[Codec]:
if isinstance(metadata, ArrayV3Metadata):
return metadata.codecs
elif isinstance(metadata, ArrayV2Metadata):
v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor)
return [v2_codec]


@dataclass
class ZarrsCodecPipeline(CodecPipeline):
codecs: tuple[Codec, ...]
metadata: ArrayMetadata
store: Store
impl: CodecPipelineImpl | None
codec_metadata_json: str
python_impl: BatchedCodecPipeline

def __getstate__(self) -> ZarrsCodecPipelineState:
return {"codec_metadata_json": self.codec_metadata_json, "codecs": self.codecs}
return {"metadata": self.metadata, "store": self.store}

def __setstate__(self, state: ZarrsCodecPipelineState):
self.codecs = state["codecs"]
self.codec_metadata_json = state["codec_metadata_json"]
self.impl = get_codec_pipeline_impl(self.codec_metadata_json)
self.python_impl = BatchedCodecPipeline.from_codecs(self.codecs)
self.metadata = state["metadata"]
self.store = state["store"]
self.impl = get_codec_pipeline_impl(self.metadata, self.store)
codecs = array_metadata_to_codecs(self.metadata)
self.python_impl = BatchedCodecPipeline.from_codecs(codecs)

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
raise NotImplementedError("evolve_from_array_spec")
return self

@classmethod
def from_codecs(cls, codecs: Iterable[Codec]) -> Self:
codec_metadata = list(codecs_to_dict(codecs))
codec_metadata_json = json.dumps(codec_metadata)
# TODO: upstream zarr-python has not settled on how to deal with configs yet
# Should they be checked when an array is created, or when an operation is performed?
# https://github.com/zarr-developers/zarr-python/issues/2409
# https://github.com/zarr-developers/zarr-python/pull/2429#issuecomment-2566976567
return BatchedCodecPipeline.from_codecs(codecs)

@classmethod
def from_array_metadata_and_store(
cls, array_metadata: ArrayMetadata, store: Store
) -> Self:
codecs = array_metadata_to_codecs(array_metadata)
return cls(
codec_metadata_json=codec_metadata_json,
codecs=tuple(codecs),
impl=get_codec_pipeline_impl(codec_metadata_json),
metadata=array_metadata,
store=store,
impl=get_codec_pipeline_impl(array_metadata, store),
python_impl=BatchedCodecPipeline.from_codecs(codecs),
)

Expand Down
12 changes: 1 addition & 11 deletions src/chunk_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ use zarrs::{
storage::StoreKey,
};

use crate::{store::StoreConfig, utils::PyErrExt};
use crate::utils::PyErrExt;

pub(crate) trait ChunksItem {
fn store_config(&self) -> StoreConfig;
fn key(&self) -> &StoreKey;
fn representation(&self) -> &ChunkRepresentation;
}
Expand All @@ -26,7 +25,6 @@ pub(crate) trait ChunksItem {
#[gen_stub_pyclass]
#[pyclass]
pub(crate) struct Basic {
store: StoreConfig,
key: StoreKey,
representation: ChunkRepresentation,
}
Expand Down Expand Up @@ -62,7 +60,6 @@ fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult<V
impl Basic {
#[new]
fn new(byte_interface: &Bound<'_, PyAny>, chunk_spec: &Bound<'_, PyAny>) -> PyResult<Self> {
let store: StoreConfig = byte_interface.getattr("store")?.extract()?;
let path: String = byte_interface.getattr("path")?.extract()?;

let chunk_shape = chunk_spec.getattr("shape")?.extract()?;
Expand All @@ -79,7 +76,6 @@ impl Basic {
let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?;
let fill_value_bytes = fill_value_to_bytes(&dtype, &fill_value)?;
Ok(Self {
store,
key: StoreKey::new(path).map_py_err::<PyValueError>()?,
representation: get_chunk_representation(chunk_shape, &dtype, fill_value_bytes)?,
})
Expand Down Expand Up @@ -118,9 +114,6 @@ impl WithSubset {
}

impl ChunksItem for Basic {
fn store_config(&self) -> StoreConfig {
self.store.clone()
}
fn key(&self) -> &StoreKey {
&self.key
}
Expand All @@ -130,9 +123,6 @@ impl ChunksItem for Basic {
}

impl ChunksItem for WithSubset {
fn store_config(&self) -> StoreConfig {
self.item.store.clone()
}
fn key(&self) -> &StoreKey {
&self.item.key
}
Expand Down
52 changes: 34 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ use unsafe_cell_slice::UnsafeCellSlice;
use utils::is_whole_chunk;
use zarrs::array::codec::{
ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder,
StoragePartialDecoder,
};
use zarrs::array::{
copy_fill_value_into, update_array_bytes, ArrayBytes, ArrayBytesFixedDisjointView, ArraySize,
CodecChain, FillValue,
copy_fill_value_into, update_array_bytes, Array, ArrayBytes, ArrayBytesFixedDisjointView,
ArrayMetadata, ArraySize, CodecChain, FillValue,
};
use zarrs::array_subset::ArraySubset;
use zarrs::metadata::v3::MetadataV3;
use zarrs::storage::StoreKey;
use zarrs::storage::store::MemoryStore;
use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey};

mod chunk_item;
mod concurrency;
Expand All @@ -41,14 +42,14 @@ mod utils;
use crate::chunk_item::ChunksItem;
use crate::concurrency::ChunkConcurrentLimitAndCodecOptions;
use crate::metadata_v2::codec_metadata_v2_to_v3;
use crate::store::StoreManager;
use crate::store::StoreConfig;
use crate::utils::{PyErrExt as _, PyUntypedArrayExt as _};

// TODO: Use a OnceLock for store with get_or_try_init when stabilised?
#[gen_stub_pyclass]
#[pyclass]
pub struct CodecPipelineImpl {
pub(crate) stores: StoreManager,
pub(crate) store: ReadableWritableListableStorage,
pub(crate) codec_chain: Arc<CodecChain>,
pub(crate) codec_options: CodecOptions,
pub(crate) chunk_concurrent_minimum: usize,
Expand All @@ -63,7 +64,7 @@ impl CodecPipelineImpl {
codec_chain: &CodecChain,
codec_options: &CodecOptions,
) -> PyResult<ArrayBytes<'a>> {
let value_encoded = self.stores.get(item)?;
let value_encoded = self.store.get(item.key()).map_py_err::<PyRuntimeError>()?;
let value_decoded = if let Some(value_encoded) = value_encoded {
let value_encoded: Vec<u8> = value_encoded.into(); // zero-copy in this case
codec_chain
Expand Down Expand Up @@ -94,15 +95,17 @@ impl CodecPipelineImpl {
.map_py_err::<PyValueError>()?;

if value_decoded.is_fill_value(item.representation().fill_value()) {
self.stores.erase(item)
self.store.erase(item.key()).map_py_err::<PyRuntimeError>()
} else {
let value_encoded = codec_chain
.encode(value_decoded, item.representation(), codec_options)
.map(Cow::into_owned)
.map_py_err::<PyRuntimeError>()?;

// Store the encoded chunk
self.stores.set(item, value_encoded.into())
self.store
.set(item.key(), value_encoded.into())
.map_py_err::<PyRuntimeError>()
}
}

Expand Down Expand Up @@ -204,7 +207,8 @@ impl CodecPipelineImpl {
#[pymethods]
impl CodecPipelineImpl {
#[pyo3(signature = (
metadata,
array_metadata,
store_config,
*,
validate_checksums=None,
chunk_concurrent_minimum=None,
Expand All @@ -213,16 +217,21 @@ impl CodecPipelineImpl {
))]
#[new]
fn new(
metadata: &str,
array_metadata: &str,
store_config: StoreConfig,
validate_checksums: Option<bool>,
chunk_concurrent_minimum: Option<usize>,
chunk_concurrent_maximum: Option<usize>,
num_threads: Option<usize>,
) -> PyResult<Self> {
let metadata: Vec<MetadataV3> =
serde_json::from_str(metadata).map_py_err::<PyTypeError>()?;
let codec_chain =
Arc::new(CodecChain::from_metadata(&metadata).map_py_err::<PyTypeError>()?);
let metadata: ArrayMetadata =
serde_json::from_str(array_metadata).map_py_err::<PyTypeError>()?;

// TODO: Add a direct metadata -> codec chain method to zarrs
let store = Arc::new(MemoryStore::new());
let array = Array::new_with_metadata(store, "/", metadata).map_py_err::<PyTypeError>()?;
let codec_chain = Arc::new(array.codecs().clone());

let mut codec_options = CodecOptionsBuilder::new();
if let Some(validate_checksums) = validate_checksums {
codec_options = codec_options.validate_checksums(validate_checksums);
Expand All @@ -235,8 +244,11 @@ impl CodecPipelineImpl {
chunk_concurrent_maximum.unwrap_or(rayon::current_num_threads());
let num_threads = num_threads.unwrap_or(rayon::current_num_threads());

let store: ReadableWritableListableStorage =
(&store_config).try_into().map_py_err::<PyTypeError>()?;

Ok(Self {
stores: StoreManager::default(),
store,
codec_chain,
codec_options,
chunk_concurrent_minimum,
Expand Down Expand Up @@ -276,7 +288,9 @@ impl CodecPipelineImpl {
partial_chunk_descriptions,
map,
|item| {
let input_handle = self.stores.decoder(item)?;
let storage_handle = Arc::new(StorageHandle::new(self.store.clone()));
let input_handle =
StoragePartialDecoder::new(storage_handle, item.key().clone());
let partial_decoder = self
.codec_chain
.clone()
Expand Down Expand Up @@ -326,7 +340,9 @@ impl CodecPipelineImpl {
&& chunk_subset.shape() == item.representation().shape_u64()
{
// See zarrs::array::Array::retrieve_chunk_into
if let Some(chunk_encoded) = self.stores.get(&item)? {
if let Some(chunk_encoded) =
self.store.get(item.key()).map_py_err::<PyRuntimeError>()?
{
// Decode the encoded data into the output buffer
let chunk_encoded: Vec<u8> = chunk_encoded.into();
self.codec_chain.decode_into(
Expand Down
4 changes: 1 addition & 3 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ use crate::{runtime::tokio_block_on, utils::PyErrExt};

mod filesystem;
mod http;
mod manager;

pub use self::filesystem::FilesystemStoreConfig;
pub use self::http::HttpStoreConfig;
pub(crate) use self::manager::StoreManager;

#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone)]
#[gen_stub_pyclass_enum]
pub enum StoreConfig {
Filesystem(FilesystemStoreConfig),
Expand Down
2 changes: 1 addition & 1 deletion src/store/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use zarrs::{filesystem::FilesystemStore, storage::ReadableWritableListableStorag

use crate::utils::PyErrExt;

#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone)]
#[gen_stub_pyclass]
#[pyclass]
pub struct FilesystemStoreConfig {
Expand Down
2 changes: 1 addition & 1 deletion src/store/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use zarrs::storage::ReadableWritableListableStorage;

use super::opendal_builder_to_sync_store;

#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone)]
#[gen_stub_pyclass]
#[pyclass]
pub struct HttpStoreConfig {
Expand Down
Loading
Loading