Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/3638.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add methods for reading stored objects as bytes and JSON-decoded bytes to store classes.
278 changes: 267 additions & 11 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
from __future__ import annotations

import asyncio
import json
from abc import ABC, abstractmethod
from asyncio import gather
from dataclasses import dataclass
from itertools import starmap
from typing import TYPE_CHECKING, Literal, Protocol, runtime_checkable

from zarr.core.buffer import Buffer, BufferPrototype
from zarr.core.buffer.core import default_buffer_prototype
from zarr.core.sync import sync

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, AsyncIterator, Iterable
from types import TracebackType
from typing import Any, Self, TypeAlias

from zarr.core.buffer import Buffer, BufferPrototype
__all__ = ["BufferLike", "ByteGetter", "ByteSetter", "Store", "set_or_delete"]

__all__ = ["ByteGetter", "ByteSetter", "Store", "set_or_delete"]
BufferLike = type[Buffer] | BufferPrototype


@dataclass
Expand Down Expand Up @@ -180,20 +185,30 @@ def __eq__(self, value: object) -> bool:
"""Equality comparison."""
...

def _get_default_buffer_class(self) -> type[Buffer]:
"""
Get the default buffer class.
"""
return default_buffer_prototype().buffer

@abstractmethod
async def get(
self,
key: str,
prototype: BufferPrototype,
prototype: BufferLike | None = None,
byte_range: ByteRequest | None = None,
) -> Buffer | None:
"""Retrieve the value associated with a given key.

Parameters
----------
key : str
prototype : BufferPrototype
The prototype of the output buffer. Stores may support a default buffer prototype.
prototype : BufferLike | None, optional
The prototype of the output buffer.
Can be either a Buffer class or an instance of `BufferPrototype`, in which the
`buffer` attribute will be used.
If `None`, the default buffer class for this store will be retrieved via the
``_get_default_buffer_class`` method.
byte_range : ByteRequest, optional
ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved.
- RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned.
Expand All @@ -206,18 +221,259 @@ async def get(
"""
...

async def get_bytes(
self,
key: str,
*,
prototype: BufferLike | None = None,
byte_range: ByteRequest | None = None,
) -> bytes:
"""
Retrieve raw bytes from the store asynchronously.

This is a convenience method that wraps ``get()`` and converts the result
to bytes. Use this when you need the raw byte content of a stored value.

Parameters
----------
key : str
The key identifying the data to retrieve.
prototype : BufferLike | None, optional
The prototype of the output buffer.
Can be either a Buffer class or an instance of `BufferPrototype`, in which the
`buffer` attribute will be used.
If `None`, the default buffer prototype for this store will be retrieved via the
``_get_default_buffer_class`` method.
byte_range : ByteRequest, optional
If specified, only retrieve a portion of the stored data.
Can be a ``RangeByteRequest``, ``OffsetByteRequest``, or ``SuffixByteRequest``.

Returns
-------
bytes
The raw bytes stored at the given key.

Raises
------
FileNotFoundError
If the key does not exist in the store.

See Also
--------
get : Lower-level method that returns a Buffer object.
get_bytes : Synchronous version of this method.
get_json : Asynchronous method for retrieving and parsing JSON data.

Examples
--------
>>> store = await MemoryStore.open()
>>> await store.set("data", Buffer.from_bytes(b"hello world"))
>>> data = await store.get_bytes("data", prototype=default_buffer_prototype())
>>> print(data)
b'hello world'
"""
buffer = await self.get(key, prototype, byte_range)
if buffer is None:
raise FileNotFoundError(key)
return buffer.to_bytes()

def get_bytes_sync(
self,
key: str = "",
*,
prototype: BufferLike | None = None,
byte_range: ByteRequest | None = None,
) -> bytes:
"""
Retrieve raw bytes from the store synchronously.

This is a synchronous wrapper around ``get_bytes()``. It should only
be called from non-async code. For async contexts, use ``get_bytes()``
instead.

Parameters
----------
key : str, optional
The key identifying the data to retrieve. Defaults to an empty string.
prototype : BufferLike | None, optional
The prototype of the output buffer.
Can be either a Buffer class or an instance of `BufferPrototype`, in which the
`buffer` attribute will be used.
If `None`, the default buffer prototype for this store will be retrieved via the
``_get_default_buffer_class`` method.
byte_range : ByteRequest, optional
If specified, only retrieve a portion of the stored data.
Can be a ``RangeByteRequest``, ``OffsetByteRequest``, or ``SuffixByteRequest``.

Returns
-------
bytes
The raw bytes stored at the given key.

Raises
------
FileNotFoundError
If the key does not exist in the store.

Warnings
--------
Do not call this method from async functions. Use ``get_bytes()`` instead
to avoid blocking the event loop.

See Also
--------
get_bytes : Asynchronous version of this method.
get_json_sync : Synchronous method for retrieving and parsing JSON data.

Examples
--------
>>> store = MemoryStore()
>>> await store.set("data", Buffer.from_bytes(b"hello world"))
>>> data = store.get_bytes_sync("data", prototype=default_buffer_prototype())
>>> print(data)
b'hello world'
"""

return sync(self.get_bytes(key, prototype=prototype, byte_range=byte_range))

async def get_json(
self,
key: str,
*,
prototype: BufferLike | None = None,
byte_range: ByteRequest | None = None,
) -> Any:
"""
Retrieve and parse JSON data from the store asynchronously.

This is a convenience method that retrieves bytes from the store and
parses them as JSON.

Parameters
----------
key : str
The key identifying the JSON data to retrieve.
prototype : BufferLike | None, optional
The prototype of the output buffer.
Can be either a Buffer class or an instance of `BufferPrototype`, in which the
`buffer` attribute will be used.
If `None`, the default buffer prototype for this store will be retrieved via the
``_get_default_buffer_class`` method.
byte_range : ByteRequest, optional
If specified, only retrieve a portion of the stored data.
Can be a ``RangeByteRequest``, ``OffsetByteRequest``, or ``SuffixByteRequest``.
Note: Using byte ranges with JSON may result in invalid JSON.

Returns
-------
Any
The parsed JSON data. This follows the behavior of ``json.loads()`` and
can be any JSON-serializable type: dict, list, str, int, float, bool, or None.

Raises
------
FileNotFoundError
If the key does not exist in the store.
json.JSONDecodeError
If the stored data is not valid JSON.

See Also
--------
get_bytes : Method for retrieving raw bytes.
get_json_sync : Synchronous version of this method.

Examples
--------
>>> store = await MemoryStore.open()
>>> metadata = {"zarr_format": 3, "node_type": "array"}
>>> await store.set("zarr.json", Buffer.from_bytes(json.dumps(metadata).encode()))
>>> data = await store.get_json("zarr.json", prototype=default_buffer_prototype())
>>> print(data)
{'zarr_format': 3, 'node_type': 'array'}
"""

return json.loads(await self.get_bytes(key, prototype=prototype, byte_range=byte_range))

def get_json_sync(
self,
key: str = "",
*,
prototype: BufferLike | None = None,
byte_range: ByteRequest | None = None,
) -> Any:
"""
Retrieve and parse JSON data from the store synchronously.

This is a synchronous wrapper around ``get_json()``. It should only
be called from non-async code. For async contexts, use ``get_json()``
instead.

Parameters
----------
key : str, optional
The key identifying the JSON data to retrieve. Defaults to an empty string.
prototype : BufferLike | None, optional
The prototype of the output buffer.
Can be either a Buffer class or an instance of `BufferPrototype`, in which the
`buffer` attribute will be used.
If `None`, the default buffer prototype for this store will be retrieved via the
``_get_default_buffer_class`` method.
byte_range : ByteRequest, optional
If specified, only retrieve a portion of the stored data.
Can be a ``RangeByteRequest``, ``OffsetByteRequest``, or ``SuffixByteRequest``.
Note: Using byte ranges with JSON may result in invalid JSON.

Returns
-------
Any
The parsed JSON data. This follows the behavior of ``json.loads()`` and
can be any JSON-serializable type: dict, list, str, int, float, bool, or None.

Raises
------
FileNotFoundError
If the key does not exist in the store.
json.JSONDecodeError
If the stored data is not valid JSON.

Warnings
--------
Do not call this method from async functions. Use ``get_json()`` instead
to avoid blocking the event loop.

See Also
--------
get_json : Asynchronous version of this method.
get_bytes_sync : Synchronous method for retrieving raw bytes without parsing.

Examples
--------
>>> store = MemoryStore()
>>> metadata = {"zarr_format": 3, "node_type": "array"}
>>> store.set("zarr.json", Buffer.from_bytes(json.dumps(metadata).encode()))
>>> data = store.get_json_sync("zarr.json", prototype=default_buffer_prototype())
>>> print(data)
{'zarr_format': 3, 'node_type': 'array'}
"""

return sync(self.get_json(key, prototype=prototype, byte_range=byte_range))

@abstractmethod
async def get_partial_values(
self,
prototype: BufferPrototype,
prototype: BufferLike | None,
key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
"""Retrieve possibly partial values from given key_ranges.

Parameters
----------
prototype : BufferPrototype
The prototype of the output buffer. Stores may support a default buffer prototype.
prototype : BufferLike | None
The prototype of the output buffer.
Can be either a Buffer class or an instance of `BufferPrototype`, in which the
`buffer` attribute will be used.
If `None`, the default buffer prototype for this store will be retrieved via the
``_get_default_buffer_class`` method.
key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]]
Ordered set of key, range pairs, a key may occur multiple times with different ranges

Expand Down Expand Up @@ -278,7 +534,7 @@ async def _set_many(self, values: Iterable[tuple[str, Buffer]]) -> None:
"""
Insert multiple (key, value) pairs into storage.
"""
await gather(*starmap(self.set, values))
await asyncio.gather(*starmap(self.set, values))

@property
def supports_consolidated_metadata(self) -> bool:
Expand Down Expand Up @@ -389,7 +645,7 @@ def close(self) -> None:
self._is_open = False

async def _get_many(
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
self, requests: Iterable[tuple[str, BufferLike | None, ByteRequest | None]]
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
"""
Retrieve a collection of objects from storage. In general this method does not guarantee
Expand Down
Loading