diff --git a/docs/user-guide/arrays.rst b/docs/user-guide/arrays.rst index b21f8e976c..110e12c3be 100644 --- a/docs/user-guide/arrays.rst +++ b/docs/user-guide/arrays.rst @@ -574,8 +574,41 @@ Any combination of integer and slice can be used for block indexing:: Sharding -------- -Coming soon. - +Using small chunk shapes in very large arrays can lead to a very large number of chunks. +This can become a performance issue for file systems and object storage. +With Zarr format 3, a new sharding feature has been added to address this issue. + +With sharding, multiple chunks can be stored in a single storage object (e.g. a file). +Within a shard, chunks are compressed and serialized separately. +This allows individual chunks to be read independently. +However, when writing data, a full shard must be written in one go for optimal +performance and to avoid concurrency issues. +That means that shards are the units of writing and chunks are the units of reading. +Users need to configure the chunk and shard shapes accordingly. + +Sharded arrays can be created by providing the ``shards`` parameter to :func:`zarr.create_array`. + + >>> a = zarr.create_array('data/example-20.zarr', shape=(10000, 10000), shards=(1000, 1000), chunks=(100, 100), dtype='uint8') + >>> a[:] = (np.arange(10000 * 10000) % 256).astype('uint8').reshape(10000, 10000) + >>> a.info_complete() + Type : Array + Zarr format : 3 + Data type : DataType.uint8 + Shape : (10000, 10000) + Shard shape : (1000, 1000) + Chunk shape : (100, 100) + Order : C + Read-only : False + Store type : LocalStore + Codecs : [{'chunk_shape': (100, 100), 'codecs': ({'endian': }, {'level': 0, 'checksum': False}), 'index_codecs': ({'endian': }, {}), 'index_location': }] + No. bytes : 100000000 (95.4M) + No. bytes stored : 3981060 + Storage ratio : 25.1 + Chunks Initialized : 100 + +In this example a shard shape of (1000, 1000) and a chunk shape of (100, 100) is used. +This means that 10*10 chunks are stored in each shard, and there are 10*10 shards in total. +Without the ``shards`` argument, there would be 10,000 chunks stored as individual files. Missing features in 3.0 ----------------------- diff --git a/docs/user-guide/performance.rst b/docs/user-guide/performance.rst index d2881fe536..f56b642fb1 100644 --- a/docs/user-guide/performance.rst +++ b/docs/user-guide/performance.rst @@ -62,6 +62,45 @@ will be one single chunk for the array:: >>> z5.chunks (10000, 10000) + +Sharding +~~~~~~~~ + +If you have large arrays but need small chunks to efficiently access the data, you can +use sharding. Sharding provides a mechanism to store multiple chunks in a single +storage object or file. This can be useful because traditional file systems and object +storage systems may have performance issues storing and accessing many files. +Additionally, small files can be inefficient to store if they are smaller than the +block size of the file system. + +Picking a good combination of chunk shape and shard shape is important for performance. +The chunk shape determines what unit of your data can be read independently, while the +shard shape determines what unit of your data can be written efficiently. + +For an example, consider you have a 100 GB array and need to read small chunks of 1 MB. +Without sharding, each chunk would be one file resulting in 100,000 files. That can +already cause performance issues on some file systems. +With sharding, you could use a shard size of 1 GB. This would result in 1000 chunks per +file and 100 files in total, which seems manageable for most storage systems. +You would still be able to read each 1 MB chunk independently, but you would need to +write your data in 1 GB increments. + +To use sharding, you need to specify the ``shards`` parameter when creating the array. + + >>> z6 = zarr.create_array(store={}, shape=(10000, 10000, 1000), shards=(1000, 1000, 1000), chunks=(100, 100, 100), dtype='uint8') + >>> z6.info + Type : Array + Zarr format : 3 + Data type : DataType.uint8 + Shape : (10000, 10000, 1000) + Shard shape : (1000, 1000, 1000) + Chunk shape : (100, 100, 100) + Order : C + Read-only : False + Store type : MemoryStore + Codecs : [{'chunk_shape': (100, 100, 100), 'codecs': ({'endian': }, {'level': 0, 'checksum': False}), 'index_codecs': ({'endian': }, {}), 'index_location': }] + No. bytes : 100000000000 (93.1G) + .. _user-guide-chunks-order: Chunk memory layout diff --git a/src/zarr/core/_info.py b/src/zarr/core/_info.py index 12bcc02e96..807e940508 100644 --- a/src/zarr/core/_info.py +++ b/src/zarr/core/_info.py @@ -80,6 +80,7 @@ class ArrayInfo: _zarr_format: ZarrFormat _data_type: np.dtype[Any] | DataType _shape: tuple[int, ...] + _shard_shape: tuple[int, ...] | None = None _chunk_shape: tuple[int, ...] | None = None _order: Literal["C", "F"] _read_only: bool @@ -96,7 +97,13 @@ def __repr__(self) -> str: Type : {_type} Zarr format : {_zarr_format} Data type : {_data_type} - Shape : {_shape} + Shape : {_shape}""") + + if self._shard_shape is not None: + template += textwrap.dedent(""" + Shard shape : {_shard_shape}""") + + template += textwrap.dedent(""" Chunk shape : {_chunk_shape} Order : {_order} Read-only : {_read_only} diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 6d8aca20ec..20e7f729aa 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -1573,14 +1573,8 @@ def _info( else: kwargs["_codecs"] = self.metadata.codecs kwargs["_data_type"] = self.metadata.data_type - # just regular? - chunk_grid = self.metadata.chunk_grid - if isinstance(chunk_grid, RegularChunkGrid): - kwargs["_chunk_shape"] = chunk_grid.chunk_shape - else: - raise NotImplementedError( - "'info' is not yet implemented for chunk grids of type {type(self.metadata.chunk_grid)}" - ) + kwargs["_chunk_shape"] = self.chunks + kwargs["_shard_shape"] = self.shards return ArrayInfo( _zarr_format=self.metadata.zarr_format, diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index 5a1f069823..583ca01c5e 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -332,12 +332,21 @@ async def write_batch( drop_axes: tuple[int, ...] = (), ) -> None: if self.supports_partial_encode: - await self.encode_partial_batch( - [ - (byte_setter, value[out_selection], chunk_selection, chunk_spec) - for byte_setter, chunk_spec, chunk_selection, out_selection in batch_info - ], - ) + # Pass scalar values as is + if len(value.shape) == 0: + await self.encode_partial_batch( + [ + (byte_setter, value, chunk_selection, chunk_spec) + for byte_setter, chunk_spec, chunk_selection, out_selection in batch_info + ], + ) + else: + await self.encode_partial_batch( + [ + (byte_setter, value[out_selection], chunk_selection, chunk_spec) + for byte_setter, chunk_spec, chunk_selection, out_selection in batch_info + ], + ) else: # Read existing bytes if not total slice diff --git a/tests/test_array.py b/tests/test_array.py index 51ad289e80..628b873e72 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -20,6 +20,7 @@ VLenUTF8Codec, ZstdCodec, ) +from zarr.codecs.sharding import ShardingCodec from zarr.core._info import ArrayInfo from zarr.core.array import ( CompressorsLike, @@ -478,121 +479,168 @@ def test_update_attrs(zarr_format: ZarrFormat) -> None: assert arr2.attrs["foo"] == "bar" +@pytest.mark.parametrize(("chunks", "shards"), [((2, 2), None), ((2, 2), (4, 4))]) class TestInfo: - def test_info_v2(self) -> None: - arr = zarr.create(shape=(4, 4), chunks=(2, 2), zarr_format=2) + def test_info_v2(self, chunks: tuple[int, int], shards: tuple[int, int] | None) -> None: + arr = zarr.create_array(store={}, shape=(8, 8), dtype="f8", chunks=chunks, zarr_format=2) result = arr.info expected = ArrayInfo( _zarr_format=2, _data_type=np.dtype("float64"), - _shape=(4, 4), - _chunk_shape=(2, 2), + _shape=(8, 8), + _chunk_shape=chunks, + _shard_shape=None, _order="C", _read_only=False, _store_type="MemoryStore", - _count_bytes=128, + _count_bytes=512, _compressor=numcodecs.Zstd(), ) assert result == expected - def test_info_v3(self) -> None: - arr = zarr.create(shape=(4, 4), chunks=(2, 2), zarr_format=3) + def test_info_v3(self, chunks: tuple[int, int], shards: tuple[int, int] | None) -> None: + arr = zarr.create_array(store={}, shape=(8, 8), dtype="f8", chunks=chunks, shards=shards) result = arr.info expected = ArrayInfo( _zarr_format=3, _data_type=DataType.parse("float64"), - _shape=(4, 4), - _chunk_shape=(2, 2), + _shape=(8, 8), + _chunk_shape=chunks, + _shard_shape=shards, _order="C", _read_only=False, _store_type="MemoryStore", - _codecs=[BytesCodec(), ZstdCodec()], - _count_bytes=128, + _codecs=[BytesCodec(), ZstdCodec()] + if shards is None + else [ShardingCodec(chunk_shape=chunks, codecs=[BytesCodec(), ZstdCodec()])], + _count_bytes=512, ) assert result == expected - def test_info_complete(self) -> None: - arr = zarr.create(shape=(4, 4), chunks=(2, 2), zarr_format=3, codecs=[BytesCodec()]) + def test_info_complete(self, chunks: tuple[int, int], shards: tuple[int, int] | None) -> None: + arr = zarr.create_array( + store={}, + shape=(8, 8), + dtype="f8", + chunks=chunks, + shards=shards, + compressors=(), + ) result = arr.info_complete() expected = ArrayInfo( _zarr_format=3, _data_type=DataType.parse("float64"), - _shape=(4, 4), - _chunk_shape=(2, 2), + _shape=(8, 8), + _chunk_shape=chunks, + _shard_shape=shards, _order="C", _read_only=False, _store_type="MemoryStore", - _codecs=[BytesCodec()], - _count_bytes=128, + _codecs=[BytesCodec()] if shards is None else [ShardingCodec(chunk_shape=chunks)], + _count_bytes=512, _count_chunks_initialized=0, - _count_bytes_stored=373, # the metadata? + _count_bytes_stored=373 if shards is None else 578, # the metadata? ) assert result == expected - arr[:2, :2] = 10 + arr[:4, :4] = 10 result = arr.info_complete() - expected = dataclasses.replace( - expected, _count_chunks_initialized=1, _count_bytes_stored=405 - ) + if shards is None: + expected = dataclasses.replace( + expected, _count_chunks_initialized=4, _count_bytes_stored=501 + ) + else: + expected = dataclasses.replace( + expected, _count_chunks_initialized=1, _count_bytes_stored=774 + ) assert result == expected - async def test_info_v2_async(self) -> None: - arr = await zarr.api.asynchronous.create(shape=(4, 4), chunks=(2, 2), zarr_format=2) + async def test_info_v2_async( + self, chunks: tuple[int, int], shards: tuple[int, int] | None + ) -> None: + arr = await zarr.api.asynchronous.create_array( + store={}, shape=(8, 8), dtype="f8", chunks=chunks, zarr_format=2 + ) result = arr.info expected = ArrayInfo( _zarr_format=2, _data_type=np.dtype("float64"), - _shape=(4, 4), + _shape=(8, 8), _chunk_shape=(2, 2), + _shard_shape=None, _order="C", _read_only=False, _store_type="MemoryStore", - _count_bytes=128, + _count_bytes=512, _compressor=numcodecs.Zstd(), ) assert result == expected - async def test_info_v3_async(self) -> None: - arr = await zarr.api.asynchronous.create(shape=(4, 4), chunks=(2, 2), zarr_format=3) + async def test_info_v3_async( + self, chunks: tuple[int, int], shards: tuple[int, int] | None + ) -> None: + arr = await zarr.api.asynchronous.create_array( + store={}, + shape=(8, 8), + dtype="f8", + chunks=chunks, + shards=shards, + ) result = arr.info expected = ArrayInfo( _zarr_format=3, _data_type=DataType.parse("float64"), - _shape=(4, 4), - _chunk_shape=(2, 2), + _shape=(8, 8), + _chunk_shape=chunks, + _shard_shape=shards, _order="C", _read_only=False, _store_type="MemoryStore", - _codecs=[BytesCodec(), ZstdCodec()], - _count_bytes=128, + _codecs=[BytesCodec(), ZstdCodec()] + if shards is None + else [ShardingCodec(chunk_shape=chunks, codecs=[BytesCodec(), ZstdCodec()])], + _count_bytes=512, ) assert result == expected - async def test_info_complete_async(self) -> None: - arr = await zarr.api.asynchronous.create( - shape=(4, 4), chunks=(2, 2), zarr_format=3, codecs=[BytesCodec()] + async def test_info_complete_async( + self, chunks: tuple[int, int], shards: tuple[int, int] | None + ) -> None: + arr = await zarr.api.asynchronous.create_array( + store={}, + dtype="f8", + shape=(8, 8), + chunks=chunks, + shards=shards, + compressors=None, ) result = await arr.info_complete() expected = ArrayInfo( _zarr_format=3, _data_type=DataType.parse("float64"), - _shape=(4, 4), - _chunk_shape=(2, 2), + _shape=(8, 8), + _chunk_shape=chunks, + _shard_shape=shards, _order="C", _read_only=False, _store_type="MemoryStore", - _codecs=[BytesCodec()], - _count_bytes=128, + _codecs=[BytesCodec()] if shards is None else [ShardingCodec(chunk_shape=chunks)], + _count_bytes=512, _count_chunks_initialized=0, - _count_bytes_stored=373, # the metadata? + _count_bytes_stored=373 if shards is None else 578, # the metadata? ) assert result == expected - await arr.setitem((slice(2), slice(2)), 10) + await arr.setitem((slice(4), slice(4)), 10) result = await arr.info_complete() - expected = dataclasses.replace( - expected, _count_chunks_initialized=1, _count_bytes_stored=405 - ) + if shards is None: + expected = dataclasses.replace( + expected, _count_chunks_initialized=4, _count_bytes_stored=501 + ) + else: + expected = dataclasses.replace( + expected, _count_chunks_initialized=1, _count_bytes_stored=774 + ) assert result == expected diff --git a/tests/test_codecs/test_sharding.py b/tests/test_codecs/test_sharding.py index 484cfa4eda..2ba57d7a39 100644 --- a/tests/test_codecs/test_sharding.py +++ b/tests/test_codecs/test_sharding.py @@ -70,6 +70,35 @@ def test_sharding( assert np.array_equal(data, read_data) +@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=["store"]) +@pytest.mark.parametrize("index_location", ["start", "end"]) +@pytest.mark.parametrize("offset", [0, 10]) +def test_sharding_scalar( + store: Store, + index_location: ShardingCodecIndexLocation, + offset: int, +) -> None: + """ + Test that we can create an array with a sharding codec, write data to that array, and get + the same data out via indexing. + """ + spath = StorePath(store) + + arr = zarr.create_array( + spath, + shape=(128, 128), + chunks=(32, 32), + shards={"shape": (64, 64), "index_location": index_location}, + dtype="uint8", + fill_value=6, + filters=[TransposeCodec(order=order_from_dim("F", 2))], + compressors=BloscCodec(cname="lz4"), + ) + arr[:16, :16] = 10 # intentionally write partial chunks + read_data = arr[:16, :16] + np.testing.assert_array_equal(read_data, 10) + + @pytest.mark.parametrize("index_location", ["start", "end"]) @pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=["store"]) @pytest.mark.parametrize(