Skip to content

Commit e7e88ed

Browse files
authored
feat: make write_empty_chunks dynamic (#103)
1 parent 51664a7 commit e7e88ed

File tree

5 files changed

+20
-13
lines changed

5 files changed

+20
-13
lines changed

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,13 @@ zarr.config.set({
5858
"codec_pipeline": {
5959
"path": "zarrs.ZarrsCodecPipeline",
6060
"validate_checksums": True,
61-
"store_empty_chunks": False,
6261
"chunk_concurrent_maximum": None,
6362
"chunk_concurrent_minimum": 4,
6463
}
6564
})
6665
```
6766

68-
If the `ZarrsCodecPipeline` is pickled, and then un-pickled, and during that time one of `store_empty_chunks`, `chunk_concurrent_minimum`, `chunk_concurrent_maximum`, or `num_threads` has changed, the newly un-pickled version will pick up the new value. However, once a `ZarrsCodecPipeline` object has been instantiated, these values are then fixed. This may change in the future as guidance from the `zarr` community becomes clear.
67+
If the `ZarrsCodecPipeline` is pickled, and then un-pickled, and during that time one of `chunk_concurrent_minimum`, `chunk_concurrent_maximum`, or `num_threads` has changed, the newly un-pickled version will pick up the new value. However, once a `ZarrsCodecPipeline` object has been instantiated, these values are then fixed. This may change in the future as guidance from the `zarr` community becomes clear.
6968

7069
## Concurrency
7170

python/zarrs/_internal.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ class CodecPipelineImpl:
1616
metadata: builtins.str,
1717
*,
1818
validate_checksums: builtins.bool | None = None,
19-
store_empty_chunks: builtins.bool | None = None,
2019
chunk_concurrent_minimum: builtins.int | None = None,
2120
chunk_concurrent_maximum: builtins.int | None = None,
2221
num_threads: builtins.int | None = None,
@@ -30,6 +29,7 @@ class CodecPipelineImpl:
3029
self,
3130
chunk_descriptions: typing.Sequence[WithSubset],
3231
value: numpy.typing.NDArray[typing.Any],
32+
write_empty_chunks: builtins.bool,
3333
) -> None: ...
3434

3535
class FilesystemStoreConfig:

python/zarrs/pipeline.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | Non
4545
return CodecPipelineImpl(
4646
codec_metadata_json,
4747
validate_checksums=config.get("codec_pipeline.validate_checksums", None),
48-
store_empty_chunks=config.get("array.write_empty_chunks", None),
4948
chunk_concurrent_minimum=config.get(
5049
"codec_pipeline.chunk_concurrent_minimum", None
5150
),
@@ -185,7 +184,7 @@ async def read(
185184
out: NDArrayLike = out.as_ndarray_like()
186185
await asyncio.to_thread(
187186
self.impl.retrieve_chunks_and_apply_index,
188-
chunks_desc,
187+
chunks_desc.chunk_info_with_indices,
189188
out,
190189
)
191190
return None
@@ -224,7 +223,10 @@ async def write(
224223
elif not value_np.flags.c_contiguous:
225224
value_np = np.ascontiguousarray(value_np)
226225
await asyncio.to_thread(
227-
self.impl.store_chunks_with_indices, chunks_desc, value_np
226+
self.impl.store_chunks_with_indices,
227+
chunks_desc.chunk_info_with_indices,
228+
value_np,
229+
chunks_desc.write_empty_chunks,
228230
)
229231
return None
230232

python/zarrs/utils.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import operator
44
import os
5+
from dataclasses import dataclass
56
from functools import reduce
67
from typing import TYPE_CHECKING, Any
78

@@ -145,6 +146,12 @@ def get_implicit_fill_value(dtype: ZDType, fill_value: Any) -> Any:
145146
return fill_value
146147

147148

149+
@dataclass(frozen=True)
150+
class RustChunkInfo:
151+
chunk_info_with_indices: list[WithSubset]
152+
write_empty_chunks: bool
153+
154+
148155
def make_chunk_info_for_rust_with_indices(
149156
batch_info: Iterable[
150157
tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]
@@ -154,13 +161,15 @@ def make_chunk_info_for_rust_with_indices(
154161
) -> list[WithSubset]:
155162
shape = shape if shape else (1,) # constant array
156163
chunk_info_with_indices: list[WithSubset] = []
164+
write_empty_chunks: bool = True
157165
for (
158166
byte_getter,
159167
chunk_spec,
160168
chunk_selection,
161169
out_selection,
162170
_,
163171
) in batch_info:
172+
write_empty_chunks = chunk_spec.config.write_empty_chunks
164173
if chunk_spec.fill_value is None:
165174
chunk_spec = ArraySpec(
166175
chunk_spec.shape,
@@ -193,4 +202,4 @@ def make_chunk_info_for_rust_with_indices(
193202
shape=shape,
194203
)
195204
)
196-
return chunk_info_with_indices
205+
return RustChunkInfo(chunk_info_with_indices, write_empty_chunks)

src/lib.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,6 @@ impl CodecPipelineImpl {
207207
metadata,
208208
*,
209209
validate_checksums=None,
210-
store_empty_chunks=None,
211210
chunk_concurrent_minimum=None,
212211
chunk_concurrent_maximum=None,
213212
num_threads=None,
@@ -216,7 +215,6 @@ impl CodecPipelineImpl {
216215
fn new(
217216
metadata: &str,
218217
validate_checksums: Option<bool>,
219-
store_empty_chunks: Option<bool>,
220218
chunk_concurrent_minimum: Option<usize>,
221219
chunk_concurrent_maximum: Option<usize>,
222220
num_threads: Option<usize>,
@@ -229,9 +227,6 @@ impl CodecPipelineImpl {
229227
if let Some(validate_checksums) = validate_checksums {
230228
codec_options = codec_options.validate_checksums(validate_checksums);
231229
}
232-
if let Some(store_empty_chunks) = store_empty_chunks {
233-
codec_options = codec_options.store_empty_chunks(store_empty_chunks);
234-
}
235230
let codec_options = codec_options.build();
236231

237232
let chunk_concurrent_minimum = chunk_concurrent_minimum
@@ -378,6 +373,7 @@ impl CodecPipelineImpl {
378373
py: Python,
379374
chunk_descriptions: Vec<chunk_item::WithSubset>,
380375
value: &Bound<'_, PyUntypedArray>,
376+
write_empty_chunks: bool,
381377
) -> PyResult<()> {
382378
enum InputValue<'a> {
383379
Array(ArrayBytes<'a>),
@@ -395,11 +391,12 @@ impl CodecPipelineImpl {
395391
let input_shape: Vec<u64> = value.shape_zarr()?;
396392

397393
// Adjust the concurrency based on the codec chain and the first chunk description
398-
let Some((chunk_concurrent_limit, codec_options)) =
394+
let Some((chunk_concurrent_limit, mut codec_options)) =
399395
chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)?
400396
else {
401397
return Ok(());
402398
};
399+
codec_options.set_store_empty_chunks(write_empty_chunks);
403400

404401
py.allow_threads(move || {
405402
let store_chunk = |item: chunk_item::WithSubset| match &input {

0 commit comments

Comments
 (0)