Skip to content

Commit 0edd023

Browse files
authored
Merge branch 'main' into release-notes
2 parents 4ee071b + 4518653 commit 0edd023

File tree

12 files changed

+111
-29
lines changed

12 files changed

+111
-29
lines changed

src/zarr/codecs/bytes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ async def _encode_single(
114114

115115
nd_array = chunk_array.as_ndarray_like()
116116
# Flatten the nd-array (only copy if needed) and reinterpret as bytes
117-
nd_array = nd_array.ravel().view(dtype="b")
117+
nd_array = nd_array.ravel().view(dtype="B")
118118
return chunk_spec.prototype.buffer.from_array_like(nd_array)
119119

120120
def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:

src/zarr/codecs/crc32c_.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ async def _encode_single(
5757
# Calculate the checksum and "cast" it to a numpy array
5858
checksum = np.array([crc32c(cast(typing_extensions.Buffer, data))], dtype=np.uint32)
5959
# Append the checksum (as bytes) to the data
60-
return chunk_spec.prototype.buffer.from_array_like(np.append(data, checksum.view("b")))
60+
return chunk_spec.prototype.buffer.from_array_like(np.append(data, checksum.view("B")))
6161

6262
def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
6363
return input_byte_length + 4

src/zarr/core/buffer/core.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class Buffer(ABC):
143143
def __init__(self, array_like: ArrayLike) -> None:
144144
if array_like.ndim != 1:
145145
raise ValueError("array_like: only 1-dim allowed")
146-
if array_like.dtype != np.dtype("b"):
146+
if array_like.dtype != np.dtype("B"):
147147
raise ValueError("array_like: only byte dtype allowed")
148148
self._data = array_like
149149

@@ -306,7 +306,7 @@ class NDBuffer:
306306
Notes
307307
-----
308308
The two buffer classes Buffer and NDBuffer are very similar. In fact, Buffer
309-
is a special case of NDBuffer where dim=1, stride=1, and dtype="b". However,
309+
is a special case of NDBuffer where dim=1, stride=1, and dtype="B". However,
310310
in order to use Python's type system to differentiate between the contiguous
311311
Buffer and the n-dim (non-contiguous) NDBuffer, we keep the definition of the
312312
two classes separate.

src/zarr/core/buffer/cpu.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def __init__(self, array_like: ArrayLike) -> None:
4949

5050
@classmethod
5151
def create_zero_length(cls) -> Self:
52-
return cls(np.array([], dtype="b"))
52+
return cls(np.array([], dtype="B"))
5353

5454
@classmethod
5555
def from_buffer(cls, buffer: core.Buffer) -> Self:
@@ -92,7 +92,7 @@ def from_bytes(cls, bytes_like: BytesLike) -> Self:
9292
-------
9393
New buffer representing `bytes_like`
9494
"""
95-
return cls.from_array_like(np.frombuffer(bytes_like, dtype="b"))
95+
return cls.from_array_like(np.frombuffer(bytes_like, dtype="B"))
9696

9797
def as_numpy_array(self) -> npt.NDArray[Any]:
9898
"""Returns the buffer as a NumPy array (host memory).
@@ -111,7 +111,7 @@ def __add__(self, other: core.Buffer) -> Self:
111111
"""Concatenate two buffers"""
112112

113113
other_array = other.as_array_like()
114-
assert other_array.dtype == np.dtype("b")
114+
assert other_array.dtype == np.dtype("B")
115115
return self.__class__(
116116
np.concatenate((np.asanyarray(self._data), np.asanyarray(other_array)))
117117
)
@@ -131,7 +131,7 @@ class NDBuffer(core.NDBuffer):
131131
Notes
132132
-----
133133
The two buffer classes Buffer and NDBuffer are very similar. In fact, Buffer
134-
is a special case of NDBuffer where dim=1, stride=1, and dtype="b". However,
134+
is a special case of NDBuffer where dim=1, stride=1, and dtype="B". However,
135135
in order to use Python's type system to differentiate between the contiguous
136136
Buffer and the n-dim (non-contiguous) NDBuffer, we keep the definition of the
137137
two classes separate.

src/zarr/core/buffer/gpu.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def __init__(self, array_like: ArrayLike) -> None:
5959

6060
if array_like.ndim != 1:
6161
raise ValueError("array_like: only 1-dim allowed")
62-
if array_like.dtype != np.dtype("b"):
62+
if array_like.dtype != np.dtype("B"):
6363
raise ValueError("array_like: only byte dtype allowed")
6464

6565
if not hasattr(array_like, "__cuda_array_interface__"):
@@ -84,7 +84,7 @@ def create_zero_length(cls) -> Self:
8484
-------
8585
New empty 0-length buffer
8686
"""
87-
return cls(cp.array([], dtype="b"))
87+
return cls(cp.array([], dtype="B"))
8888

8989
@classmethod
9090
def from_buffer(cls, buffer: core.Buffer) -> Self:
@@ -100,14 +100,14 @@ def from_buffer(cls, buffer: core.Buffer) -> Self:
100100

101101
@classmethod
102102
def from_bytes(cls, bytes_like: BytesLike) -> Self:
103-
return cls.from_array_like(cp.frombuffer(bytes_like, dtype="b"))
103+
return cls.from_array_like(cp.frombuffer(bytes_like, dtype="B"))
104104

105105
def as_numpy_array(self) -> npt.NDArray[Any]:
106106
return cast(npt.NDArray[Any], cp.asnumpy(self._data))
107107

108108
def __add__(self, other: core.Buffer) -> Self:
109109
other_array = other.as_array_like()
110-
assert other_array.dtype == np.dtype("b")
110+
assert other_array.dtype == np.dtype("B")
111111
gpu_other = Buffer(other_array)
112112
gpu_other_array = gpu_other.as_array_like()
113113
return self.__class__(
@@ -129,7 +129,7 @@ class NDBuffer(core.NDBuffer):
129129
Notes
130130
-----
131131
The two buffer classes Buffer and NDBuffer are very similar. In fact, Buffer
132-
is a special case of NDBuffer where dim=1, stride=1, and dtype="b". However,
132+
is a special case of NDBuffer where dim=1, stride=1, and dtype="B". However,
133133
in order to use Python's type system to differentiate between the contiguous
134134
Buffer and the n-dim (non-contiguous) NDBuffer, we keep the definition of the
135135
two classes separate.

src/zarr/storage/_fsspec.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
class FsspecStore(Store):
3434
"""
35-
A remote Store based on FSSpec
35+
Store for remote data based on FSSpec.
3636
3737
Parameters
3838
----------

src/zarr/storage/_local.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def _put(
6767

6868
class LocalStore(Store):
6969
"""
70-
Local file system store.
70+
Store for the local file system.
7171
7272
Parameters
7373
----------

src/zarr/storage/_logging.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
class LoggingStore(WrapperStore[T_Store]):
2626
"""
27-
Store wrapper that logs all calls to the wrapped store.
27+
Store that logs all calls to another wrapped store.
2828
2929
Parameters
3030
----------

src/zarr/storage/_memory.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
class MemoryStore(Store):
2121
"""
22-
In-memory store.
22+
Store for local memory.
2323
2424
Parameters
2525
----------
@@ -173,8 +173,10 @@ async def list_dir(self, prefix: str) -> AsyncIterator[str]:
173173

174174

175175
class GpuMemoryStore(MemoryStore):
176-
"""A GPU only memory store that stores every chunk in GPU memory irrespective
177-
of the original location.
176+
"""
177+
Store for GPU memory.
178+
179+
Stores every chunk in GPU memory irrespective of the original location.
178180
179181
The dictionary of buffers to initialize this memory store with *must* be
180182
GPU Buffers.

src/zarr/storage/_obstore.py

Lines changed: 87 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737

3838

3939
class ObjectStore(Store):
40-
"""A Zarr store that uses obstore for fast read/write from AWS, GCP, Azure.
40+
"""
41+
Store that uses obstore for fast read/write from AWS, GCP, Azure.
4142
4243
Parameters
4344
----------
@@ -106,10 +107,25 @@ async def get(
106107
)
107108
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
108109
elif isinstance(byte_range, SuffixByteRequest):
109-
resp = await obs.get_async(
110-
self.store, key, options={"range": {"suffix": byte_range.suffix}}
111-
)
112-
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
110+
# some object stores (Azure) don't support suffix requests. In this
111+
# case, our workaround is to first get the length of the object and then
112+
# manually request the byte range at the end.
113+
try:
114+
resp = await obs.get_async(
115+
self.store, key, options={"range": {"suffix": byte_range.suffix}}
116+
)
117+
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
118+
except obs.exceptions.NotSupportedError:
119+
head_resp = await obs.head_async(self.store, key)
120+
file_size = head_resp["size"]
121+
suffix_len = byte_range.suffix
122+
buffer = await obs.get_range_async(
123+
self.store,
124+
key,
125+
start=file_size - suffix_len,
126+
length=suffix_len,
127+
)
128+
return prototype.buffer.from_bytes(buffer) # type: ignore[arg-type]
113129
else:
114130
raise ValueError(f"Unexpected byte_range, got {byte_range}")
115131
except _ALLOWED_EXCEPTIONS:
@@ -265,10 +281,29 @@ class _OtherRequest(TypedDict):
265281
path: str
266282
"""The path to request from."""
267283

268-
range: OffsetRange | SuffixRange | None
284+
range: OffsetRange | None
285+
# Note: suffix requests are handled separately because some object stores (Azure)
286+
# don't support them
269287
"""The range request type."""
270288

271289

290+
class _SuffixRequest(TypedDict):
291+
"""Offset or suffix range requests.
292+
293+
These requests cannot be concurrent on the Rust side, and each need their own call
294+
to `obstore.get_async`, passing in the `range` parameter.
295+
"""
296+
297+
original_request_index: int
298+
"""The positional index in the original key_ranges input"""
299+
300+
path: str
301+
"""The path to request from."""
302+
303+
range: SuffixRange
304+
"""The suffix range."""
305+
306+
272307
class _Response(TypedDict):
273308
"""A response buffer associated with the original index that it should be restored to."""
274309

@@ -317,7 +352,7 @@ async def _make_other_request(
317352
prototype: BufferPrototype,
318353
semaphore: asyncio.Semaphore,
319354
) -> list[_Response]:
320-
"""Make suffix or offset requests.
355+
"""Make offset or full-file requests.
321356
322357
We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all
323358
futures can be gathered together.
@@ -339,6 +374,46 @@ async def _make_other_request(
339374
]
340375

341376

377+
async def _make_suffix_request(
378+
store: _UpstreamObjectStore,
379+
request: _SuffixRequest,
380+
prototype: BufferPrototype,
381+
semaphore: asyncio.Semaphore,
382+
) -> list[_Response]:
383+
"""Make suffix requests.
384+
385+
This is separated out from `_make_other_request` because some object stores (Azure)
386+
don't support suffix requests. In this case, our workaround is to first get the
387+
length of the object and then manually request the byte range at the end.
388+
389+
We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all
390+
futures can be gathered together.
391+
"""
392+
import obstore as obs
393+
394+
async with semaphore:
395+
try:
396+
resp = await obs.get_async(store, request["path"], options={"range": request["range"]})
397+
buffer = await resp.bytes_async()
398+
except obs.exceptions.NotSupportedError:
399+
head_resp = await obs.head_async(store, request["path"])
400+
file_size = head_resp["size"]
401+
suffix_len = request["range"]["suffix"]
402+
buffer = await obs.get_range_async(
403+
store,
404+
request["path"],
405+
start=file_size - suffix_len,
406+
length=suffix_len,
407+
)
408+
409+
return [
410+
{
411+
"original_request_index": request["original_request_index"],
412+
"buffer": prototype.buffer.from_bytes(buffer), # type: ignore[arg-type]
413+
}
414+
]
415+
416+
342417
async def _get_partial_values(
343418
store: _UpstreamObjectStore,
344419
prototype: BufferPrototype,
@@ -358,6 +433,7 @@ async def _get_partial_values(
358433
key_ranges = list(key_ranges)
359434
per_file_bounded_requests: dict[str, list[_BoundedRequest]] = defaultdict(list)
360435
other_requests: list[_OtherRequest] = []
436+
suffix_requests: list[_SuffixRequest] = []
361437

362438
for idx, (path, byte_range) in enumerate(key_ranges):
363439
if byte_range is None:
@@ -381,7 +457,7 @@ async def _get_partial_values(
381457
}
382458
)
383459
elif isinstance(byte_range, SuffixByteRequest):
384-
other_requests.append(
460+
suffix_requests.append(
385461
{
386462
"original_request_index": idx,
387463
"path": path,
@@ -402,6 +478,9 @@ async def _get_partial_values(
402478
for request in other_requests:
403479
futs.append(_make_other_request(store, request, prototype, semaphore=semaphore)) # noqa: PERF401
404480

481+
for suffix_request in suffix_requests:
482+
futs.append(_make_suffix_request(store, suffix_request, prototype, semaphore=semaphore)) # noqa: PERF401
483+
405484
buffers: list[Buffer | None] = [None] * len(key_ranges)
406485

407486
for responses in await asyncio.gather(*futs):

0 commit comments

Comments
 (0)