diff --git a/README.md b/README.md index 4883ea8..c7c7663 100644 --- a/README.md +++ b/README.md @@ -58,14 +58,13 @@ zarr.config.set({ "codec_pipeline": { "path": "zarrs.ZarrsCodecPipeline", "validate_checksums": True, - "store_empty_chunks": False, "chunk_concurrent_maximum": None, "chunk_concurrent_minimum": 4, } }) ``` -If the `ZarrsCodecPipeline` is pickled, and then un-pickled, and during that time one of `store_empty_chunks`, `chunk_concurrent_minimum`, `chunk_concurrent_maximum`, or `num_threads` has changed, the newly un-pickled version will pick up the new value. However, once a `ZarrsCodecPipeline` object has been instantiated, these values are then fixed. This may change in the future as guidance from the `zarr` community becomes clear. +If the `ZarrsCodecPipeline` is pickled, and then un-pickled, and during that time one of `chunk_concurrent_minimum`, `chunk_concurrent_maximum`, or `num_threads` has changed, the newly un-pickled version will pick up the new value. However, once a `ZarrsCodecPipeline` object has been instantiated, these values are then fixed. This may change in the future as guidance from the `zarr` community becomes clear. ## Concurrency diff --git a/python/zarrs/_internal.pyi b/python/zarrs/_internal.pyi index b5fa0f5..ecbf742 100644 --- a/python/zarrs/_internal.pyi +++ b/python/zarrs/_internal.pyi @@ -16,7 +16,6 @@ class CodecPipelineImpl: metadata: builtins.str, *, validate_checksums: builtins.bool | None = None, - store_empty_chunks: builtins.bool | None = None, chunk_concurrent_minimum: builtins.int | None = None, chunk_concurrent_maximum: builtins.int | None = None, num_threads: builtins.int | None = None, @@ -30,6 +29,7 @@ class CodecPipelineImpl: self, chunk_descriptions: typing.Sequence[WithSubset], value: numpy.typing.NDArray[typing.Any], + write_empty_chunks: builtins.bool, ) -> None: ... class FilesystemStoreConfig: diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index 6ebc24d..b3a65ef 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -44,7 +44,6 @@ def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | Non return CodecPipelineImpl( codec_metadata_json, validate_checksums=config.get("codec_pipeline.validate_checksums", None), - store_empty_chunks=config.get("array.write_empty_chunks", None), chunk_concurrent_minimum=config.get( "codec_pipeline.chunk_concurrent_minimum", None ), @@ -184,7 +183,7 @@ async def read( out: NDArrayLike = out.as_ndarray_like() await asyncio.to_thread( self.impl.retrieve_chunks_and_apply_index, - chunks_desc, + chunks_desc.chunk_info_with_indices, out, ) return None @@ -223,7 +222,10 @@ async def write( elif not value_np.flags.c_contiguous: value_np = np.ascontiguousarray(value_np) await asyncio.to_thread( - self.impl.store_chunks_with_indices, chunks_desc, value_np + self.impl.store_chunks_with_indices, + chunks_desc.chunk_info_with_indices, + value_np, + chunks_desc.write_empty_chunks, ) return None diff --git a/python/zarrs/utils.py b/python/zarrs/utils.py index 35743b8..315e29b 100644 --- a/python/zarrs/utils.py +++ b/python/zarrs/utils.py @@ -2,6 +2,7 @@ import operator import os +from dataclasses import dataclass from functools import reduce from typing import TYPE_CHECKING, Any @@ -145,6 +146,12 @@ def get_implicit_fill_value(dtype: np.dtype, fill_value: Any) -> Any: return fill_value +@dataclass(frozen=True) +class RustChunkInfo: + chunk_info_with_indices: list[WithSubset] + write_empty_chunks: bool + + def make_chunk_info_for_rust_with_indices( batch_info: Iterable[ tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool] @@ -154,6 +161,7 @@ def make_chunk_info_for_rust_with_indices( ) -> list[WithSubset]: shape = shape if shape else (1,) # constant array chunk_info_with_indices: list[WithSubset] = [] + write_empty_chunks: bool = True for ( byte_getter, chunk_spec, @@ -161,6 +169,7 @@ def make_chunk_info_for_rust_with_indices( out_selection, _, ) in batch_info: + write_empty_chunks = chunk_spec.config.write_empty_chunks if chunk_spec.fill_value is None: chunk_spec = ArraySpec( chunk_spec.shape, @@ -193,4 +202,4 @@ def make_chunk_info_for_rust_with_indices( shape=shape, ) ) - return chunk_info_with_indices + return RustChunkInfo(chunk_info_with_indices, write_empty_chunks) diff --git a/src/lib.rs b/src/lib.rs index afd70cd..543cffb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -207,7 +207,6 @@ impl CodecPipelineImpl { metadata, *, validate_checksums=None, - store_empty_chunks=None, chunk_concurrent_minimum=None, chunk_concurrent_maximum=None, num_threads=None, @@ -216,7 +215,6 @@ impl CodecPipelineImpl { fn new( metadata: &str, validate_checksums: Option, - store_empty_chunks: Option, chunk_concurrent_minimum: Option, chunk_concurrent_maximum: Option, num_threads: Option, @@ -229,9 +227,6 @@ impl CodecPipelineImpl { if let Some(validate_checksums) = validate_checksums { codec_options = codec_options.validate_checksums(validate_checksums); } - if let Some(store_empty_chunks) = store_empty_chunks { - codec_options = codec_options.store_empty_chunks(store_empty_chunks); - } let codec_options = codec_options.build(); let chunk_concurrent_minimum = chunk_concurrent_minimum @@ -378,6 +373,7 @@ impl CodecPipelineImpl { py: Python, chunk_descriptions: Vec, value: &Bound<'_, PyUntypedArray>, + write_empty_chunks: bool, ) -> PyResult<()> { enum InputValue<'a> { Array(ArrayBytes<'a>), @@ -395,11 +391,12 @@ impl CodecPipelineImpl { let input_shape: Vec = value.shape_zarr()?; // Adjust the concurrency based on the codec chain and the first chunk description - let Some((chunk_concurrent_limit, codec_options)) = + let Some((chunk_concurrent_limit, mut codec_options)) = chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? else { return Ok(()); }; + codec_options.set_store_empty_chunks(write_empty_chunks); py.allow_threads(move || { let store_chunk = |item: chunk_item::WithSubset| match &input {