Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
ChunkCoords,
MemoryOrder,
ZarrFormat,
_warn_order_kwarg,
_warn_write_empty_chunks_kwarg,
)
from zarr.core.config import config
from zarr.core.group import AsyncGroup, ConsolidatedMetadata, GroupMetadata
Expand Down Expand Up @@ -724,7 +726,7 @@ async def create(
read_only: bool | None = None,
object_codec: Codec | None = None, # TODO: type has changed
dimension_separator: Literal[".", "/"] | None = None,
write_empty_chunks: bool = False, # TODO: default has changed
write_empty_chunks: bool | None = None,
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
meta_array: Any | None = None, # TODO: need type
Expand Down Expand Up @@ -760,6 +762,7 @@ async def create(
fill_value : object
Default value to use for uninitialized portions of the array.
order : {'C', 'F'}, optional
Deprecated in favor of the `array.order` configuration variable.
Memory layout to be used within each chunk.
Default is set in Zarr's config (`array.order`).
store : Store or str
Expand Down Expand Up @@ -795,6 +798,8 @@ async def create(
.. versionadded:: 2.8

write_empty_chunks : bool, optional
Deprecated in favor of the `array.write_empty_chunks` configuration variable.

If True (default), all chunks will be stored regardless of their
contents. If False, each chunk is compared to the array's fill value
prior to storing. If a chunk is uniformly equal to the fill value, then
Expand Down Expand Up @@ -850,14 +855,12 @@ async def create(
raise ValueError(
"dimension_separator is not supported for zarr format 3, use chunk_key_encoding instead"
)
else:
warnings.warn(
"dimension_separator is not yet implemented",
RuntimeWarning,
stacklevel=2,
)
if write_empty_chunks:
warnings.warn("write_empty_chunks is not yet implemented", RuntimeWarning, stacklevel=2)

if order is not None:
_warn_order_kwarg()
if write_empty_chunks is not None:
_warn_write_empty_chunks_kwarg()

if meta_array is not None:
warnings.warn("meta_array is not yet implemented", RuntimeWarning, stacklevel=2)

Expand Down Expand Up @@ -885,6 +888,7 @@ async def create(
dimension_names=dimension_names,
attributes=attributes,
order=order,
write_empty_chunks=write_empty_chunks,
**kwargs,
)

Expand Down Expand Up @@ -1058,6 +1062,11 @@ async def open_array(

zarr_format = _handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format)

if "order" in kwargs:
_warn_order_kwarg()
if "write_empty_chunks" in kwargs:
_warn_write_empty_chunks_kwarg()

try:
return await AsyncArray.open(store_path, zarr_format=zarr_format)
except FileNotFoundError:
Expand Down
28 changes: 16 additions & 12 deletions src/zarr/codecs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ async def _read_key(
_read_key,
config.get("async.concurrency"),
)
chunk_array_batch = await self.decode_batch(
chunk_array_decoded = await self.decode_batch(
[
(chunk_bytes, chunk_spec)
for chunk_bytes, (_, chunk_spec, _, _) in zip(
Expand All @@ -369,23 +369,27 @@ async def _read_key(
],
)

chunk_array_batch = [
chunk_array_merged = [
self._merge_chunk_array(
chunk_array, value, out_selection, chunk_spec, chunk_selection, drop_axes
)
for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip(
chunk_array_batch, batch_info, strict=False
)
]

chunk_array_batch = [
None
if chunk_array is None or chunk_array.all_equal(chunk_spec.fill_value)
else chunk_array
for chunk_array, (_, chunk_spec, _, _) in zip(
chunk_array_batch, batch_info, strict=False
chunk_array_decoded, batch_info, strict=False
)
]
chunk_array_batch: list[NDBuffer | None] = []
for chunk_array, (_, chunk_spec, _, _) in zip(
chunk_array_merged, batch_info, strict=False
):
if chunk_array is None:
chunk_array_batch.append(None) # type: ignore[unreachable]
else:
if not chunk_spec.config.write_empty_chunks and chunk_array.all_equal(
chunk_spec.fill_value
):
chunk_array_batch.append(None)
else:
chunk_array_batch.append(chunk_array)

chunk_bytes_batch = await self.encode_batch(
[
Expand Down
8 changes: 5 additions & 3 deletions src/zarr/codecs/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from zarr.abc.store import ByteGetter, ByteRangeRequest, ByteSetter
from zarr.codecs.bytes import BytesCodec
from zarr.codecs.crc32c_ import Crc32cCodec
from zarr.core.array_spec import ArraySpec
from zarr.core.array_spec import ArrayConfig, ArraySpec
from zarr.core.buffer import (
Buffer,
BufferPrototype,
Expand Down Expand Up @@ -665,7 +665,9 @@ def _get_index_chunk_spec(self, chunks_per_shard: ChunkCoords) -> ArraySpec:
shape=chunks_per_shard + (2,),
dtype=np.dtype("<u8"),
fill_value=MAX_UINT_64,
order="C", # Note: this is hard-coded for simplicity -- it is not surfaced into user code
config=ArrayConfig(
order="C", write_empty_chunks=False
), # Note: this is hard-coded for simplicity -- it is not surfaced into user code,
prototype=numpy_buffer_prototype(),
)

Expand All @@ -674,7 +676,7 @@ def _get_chunk_spec(self, shard_spec: ArraySpec) -> ArraySpec:
shape=self.chunk_shape,
dtype=shard_spec.dtype,
fill_value=shard_spec.fill_value,
order=shard_spec.order,
config=shard_spec.config,
prototype=shard_spec.prototype,
)

Expand Down
2 changes: 1 addition & 1 deletion src/zarr/codecs/transpose.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
shape=tuple(chunk_spec.shape[self.order[i]] for i in range(chunk_spec.ndim)),
dtype=chunk_spec.dtype,
fill_value=chunk_spec.fill_value,
order=chunk_spec.order,
config=chunk_spec.config,
prototype=chunk_spec.prototype,
)

Expand Down
Loading
Loading