Skip to content

Commit b70134a

Browse files
authored
Merge branch 'main' into store-docstrings
2 parents 5544885 + a0761ac commit b70134a

File tree

1 file changed

+85
-7
lines changed

1 file changed

+85
-7
lines changed

src/zarr/storage/_obstore.py

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,25 @@ async def get(
107107
)
108108
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
109109
elif isinstance(byte_range, SuffixByteRequest):
110-
resp = await obs.get_async(
111-
self.store, key, options={"range": {"suffix": byte_range.suffix}}
112-
)
113-
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
110+
# some object stores (Azure) don't support suffix requests. In this
111+
# case, our workaround is to first get the length of the object and then
112+
# manually request the byte range at the end.
113+
try:
114+
resp = await obs.get_async(
115+
self.store, key, options={"range": {"suffix": byte_range.suffix}}
116+
)
117+
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
118+
except obs.exceptions.NotSupportedError:
119+
head_resp = await obs.head_async(self.store, key)
120+
file_size = head_resp["size"]
121+
suffix_len = byte_range.suffix
122+
buffer = await obs.get_range_async(
123+
self.store,
124+
key,
125+
start=file_size - suffix_len,
126+
length=suffix_len,
127+
)
128+
return prototype.buffer.from_bytes(buffer) # type: ignore[arg-type]
114129
else:
115130
raise ValueError(f"Unexpected byte_range, got {byte_range}")
116131
except _ALLOWED_EXCEPTIONS:
@@ -266,10 +281,29 @@ class _OtherRequest(TypedDict):
266281
path: str
267282
"""The path to request from."""
268283

269-
range: OffsetRange | SuffixRange | None
284+
range: OffsetRange | None
285+
# Note: suffix requests are handled separately because some object stores (Azure)
286+
# don't support them
270287
"""The range request type."""
271288

272289

290+
class _SuffixRequest(TypedDict):
291+
"""Offset or suffix range requests.
292+
293+
These requests cannot be concurrent on the Rust side, and each need their own call
294+
to `obstore.get_async`, passing in the `range` parameter.
295+
"""
296+
297+
original_request_index: int
298+
"""The positional index in the original key_ranges input"""
299+
300+
path: str
301+
"""The path to request from."""
302+
303+
range: SuffixRange
304+
"""The suffix range."""
305+
306+
273307
class _Response(TypedDict):
274308
"""A response buffer associated with the original index that it should be restored to."""
275309

@@ -318,7 +352,7 @@ async def _make_other_request(
318352
prototype: BufferPrototype,
319353
semaphore: asyncio.Semaphore,
320354
) -> list[_Response]:
321-
"""Make suffix or offset requests.
355+
"""Make offset or full-file requests.
322356
323357
We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all
324358
futures can be gathered together.
@@ -340,6 +374,46 @@ async def _make_other_request(
340374
]
341375

342376

377+
async def _make_suffix_request(
378+
store: _UpstreamObjectStore,
379+
request: _SuffixRequest,
380+
prototype: BufferPrototype,
381+
semaphore: asyncio.Semaphore,
382+
) -> list[_Response]:
383+
"""Make suffix requests.
384+
385+
This is separated out from `_make_other_request` because some object stores (Azure)
386+
don't support suffix requests. In this case, our workaround is to first get the
387+
length of the object and then manually request the byte range at the end.
388+
389+
We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all
390+
futures can be gathered together.
391+
"""
392+
import obstore as obs
393+
394+
async with semaphore:
395+
try:
396+
resp = await obs.get_async(store, request["path"], options={"range": request["range"]})
397+
buffer = await resp.bytes_async()
398+
except obs.exceptions.NotSupportedError:
399+
head_resp = await obs.head_async(store, request["path"])
400+
file_size = head_resp["size"]
401+
suffix_len = request["range"]["suffix"]
402+
buffer = await obs.get_range_async(
403+
store,
404+
request["path"],
405+
start=file_size - suffix_len,
406+
length=suffix_len,
407+
)
408+
409+
return [
410+
{
411+
"original_request_index": request["original_request_index"],
412+
"buffer": prototype.buffer.from_bytes(buffer), # type: ignore[arg-type]
413+
}
414+
]
415+
416+
343417
async def _get_partial_values(
344418
store: _UpstreamObjectStore,
345419
prototype: BufferPrototype,
@@ -359,6 +433,7 @@ async def _get_partial_values(
359433
key_ranges = list(key_ranges)
360434
per_file_bounded_requests: dict[str, list[_BoundedRequest]] = defaultdict(list)
361435
other_requests: list[_OtherRequest] = []
436+
suffix_requests: list[_SuffixRequest] = []
362437

363438
for idx, (path, byte_range) in enumerate(key_ranges):
364439
if byte_range is None:
@@ -382,7 +457,7 @@ async def _get_partial_values(
382457
}
383458
)
384459
elif isinstance(byte_range, SuffixByteRequest):
385-
other_requests.append(
460+
suffix_requests.append(
386461
{
387462
"original_request_index": idx,
388463
"path": path,
@@ -403,6 +478,9 @@ async def _get_partial_values(
403478
for request in other_requests:
404479
futs.append(_make_other_request(store, request, prototype, semaphore=semaphore)) # noqa: PERF401
405480

481+
for suffix_request in suffix_requests:
482+
futs.append(_make_suffix_request(store, suffix_request, prototype, semaphore=semaphore)) # noqa: PERF401
483+
406484
buffers: list[Buffer | None] = [None] * len(key_ranges)
407485

408486
for responses in await asyncio.gather(*futs):

0 commit comments

Comments
 (0)