Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,13 @@ zarr.config.set({
"codec_pipeline": {
"path": "zarrs.ZarrsCodecPipeline",
"validate_checksums": True,
"store_empty_chunks": False,
"chunk_concurrent_maximum": None,
"chunk_concurrent_minimum": 4,
}
})
```

If the `ZarrsCodecPipeline` is pickled, and then un-pickled, and during that time one of `store_empty_chunks`, `chunk_concurrent_minimum`, `chunk_concurrent_maximum`, or `num_threads` has changed, the newly un-pickled version will pick up the new value. However, once a `ZarrsCodecPipeline` object has been instantiated, these values are then fixed. This may change in the future as guidance from the `zarr` community becomes clear.
If the `ZarrsCodecPipeline` is pickled, and then un-pickled, and during that time one of `chunk_concurrent_minimum`, `chunk_concurrent_maximum`, or `num_threads` has changed, the newly un-pickled version will pick up the new value. However, once a `ZarrsCodecPipeline` object has been instantiated, these values are then fixed. This may change in the future as guidance from the `zarr` community becomes clear.

## Concurrency

Expand Down
2 changes: 1 addition & 1 deletion python/zarrs/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class CodecPipelineImpl:
metadata: builtins.str,
*,
validate_checksums: builtins.bool | None = None,
store_empty_chunks: builtins.bool | None = None,
chunk_concurrent_minimum: builtins.int | None = None,
chunk_concurrent_maximum: builtins.int | None = None,
num_threads: builtins.int | None = None,
Expand All @@ -30,6 +29,7 @@ class CodecPipelineImpl:
self,
chunk_descriptions: typing.Sequence[WithSubset],
value: numpy.typing.NDArray[typing.Any],
write_empty_chunks: builtins.bool,
) -> None: ...

class FilesystemStoreConfig:
Expand Down
8 changes: 5 additions & 3 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | Non
return CodecPipelineImpl(
codec_metadata_json,
validate_checksums=config.get("codec_pipeline.validate_checksums", None),
store_empty_chunks=config.get("array.write_empty_chunks", None),
chunk_concurrent_minimum=config.get(
"codec_pipeline.chunk_concurrent_minimum", None
),
Expand Down Expand Up @@ -184,7 +183,7 @@ async def read(
out: NDArrayLike = out.as_ndarray_like()
await asyncio.to_thread(
self.impl.retrieve_chunks_and_apply_index,
chunks_desc,
chunks_desc.chunk_info_with_indices,
out,
)
return None
Expand Down Expand Up @@ -223,7 +222,10 @@ async def write(
elif not value_np.flags.c_contiguous:
value_np = np.ascontiguousarray(value_np)
await asyncio.to_thread(
self.impl.store_chunks_with_indices, chunks_desc, value_np
self.impl.store_chunks_with_indices,
chunks_desc.chunk_info_with_indices,
value_np,
chunks_desc.write_empty_chunks,
)
return None

Expand Down
11 changes: 10 additions & 1 deletion python/zarrs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import operator
import os
from dataclasses import dataclass
from functools import reduce
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -145,6 +146,12 @@ def get_implicit_fill_value(dtype: np.dtype, fill_value: Any) -> Any:
return fill_value


@dataclass(frozen=True)
class RustChunkInfo:
chunk_info_with_indices: list[WithSubset]
write_empty_chunks: bool


def make_chunk_info_for_rust_with_indices(
batch_info: Iterable[
tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]
Expand All @@ -154,13 +161,15 @@ def make_chunk_info_for_rust_with_indices(
) -> list[WithSubset]:
shape = shape if shape else (1,) # constant array
chunk_info_with_indices: list[WithSubset] = []
write_empty_chunks: bool = True
for (
byte_getter,
chunk_spec,
chunk_selection,
out_selection,
_,
) in batch_info:
write_empty_chunks = chunk_spec.config.write_empty_chunks
if chunk_spec.fill_value is None:
chunk_spec = ArraySpec(
chunk_spec.shape,
Expand Down Expand Up @@ -193,4 +202,4 @@ def make_chunk_info_for_rust_with_indices(
shape=shape,
)
)
return chunk_info_with_indices
return RustChunkInfo(chunk_info_with_indices, write_empty_chunks)
9 changes: 3 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ impl CodecPipelineImpl {
metadata,
*,
validate_checksums=None,
store_empty_chunks=None,
chunk_concurrent_minimum=None,
chunk_concurrent_maximum=None,
num_threads=None,
Expand All @@ -216,7 +215,6 @@ impl CodecPipelineImpl {
fn new(
metadata: &str,
validate_checksums: Option<bool>,
store_empty_chunks: Option<bool>,
chunk_concurrent_minimum: Option<usize>,
chunk_concurrent_maximum: Option<usize>,
num_threads: Option<usize>,
Expand All @@ -229,9 +227,6 @@ impl CodecPipelineImpl {
if let Some(validate_checksums) = validate_checksums {
codec_options = codec_options.validate_checksums(validate_checksums);
}
if let Some(store_empty_chunks) = store_empty_chunks {
codec_options = codec_options.store_empty_chunks(store_empty_chunks);
}
let codec_options = codec_options.build();

let chunk_concurrent_minimum = chunk_concurrent_minimum
Expand Down Expand Up @@ -378,6 +373,7 @@ impl CodecPipelineImpl {
py: Python,
chunk_descriptions: Vec<chunk_item::WithSubset>,
value: &Bound<'_, PyUntypedArray>,
write_empty_chunks: bool,
) -> PyResult<()> {
enum InputValue<'a> {
Array(ArrayBytes<'a>),
Expand All @@ -395,11 +391,12 @@ impl CodecPipelineImpl {
let input_shape: Vec<u64> = value.shape_zarr()?;

// Adjust the concurrency based on the codec chain and the first chunk description
let Some((chunk_concurrent_limit, codec_options)) =
let Some((chunk_concurrent_limit, mut codec_options)) =
chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)?
else {
return Ok(());
};
codec_options.set_store_empty_chunks(write_empty_chunks);

py.allow_threads(move || {
let store_chunk = |item: chunk_item::WithSubset| match &input {
Expand Down
Loading