Skip to content

Commit f5c884b

Browse files
committed
update
1 parent c3e7296 commit f5c884b

File tree

2 files changed

+141
-78
lines changed

2 files changed

+141
-78
lines changed

src/zarr/storage/object_store.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from collections import defaultdict
5+
from collections.abc import Iterable
6+
from typing import TYPE_CHECKING, Any
7+
8+
import object_store_rs as obs
9+
10+
from zarr.abc.store import ByteRangeRequest, Store
11+
from zarr.core.buffer import Buffer
12+
from zarr.core.buffer.core import BufferPrototype
13+
14+
if TYPE_CHECKING:
15+
from collections.abc import AsyncGenerator, Coroutine, Iterable
16+
from typing import Any
17+
18+
from object_store_rs.store import ObjectStore as _ObjectStore
19+
20+
from zarr.core.buffer import Buffer, BufferPrototype
21+
from zarr.core.common import BytesLike
22+
23+
24+
class ObjectStore(Store):
25+
store: _ObjectStore
26+
27+
def __init__(self, store: _ObjectStore) -> None:
28+
self.store = store
29+
30+
def __str__(self) -> str:
31+
return f"object://{self.store}"
32+
33+
def __repr__(self) -> str:
34+
return f"ObjectStore({self!r})"
35+
36+
async def get(
37+
self, key: str, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None
38+
) -> Buffer:
39+
if byte_range is None:
40+
resp = await obs.get_async(self.store, key)
41+
return await resp.bytes_async()
42+
43+
pass
44+
45+
raise NotImplementedError
46+
47+
async def get_partial_values(
48+
self,
49+
prototype: BufferPrototype,
50+
key_ranges: Iterable[tuple[str, ByteRangeRequest]],
51+
) -> list[Buffer | None]:
52+
# TODO: this is a bit hacky and untested. ObjectStore has a `get_ranges` method
53+
# that will additionally merge nearby ranges, but it's _per_ file. So we need to
54+
# split these key_ranges into **per-file** key ranges, and then reassemble the
55+
# results in the original order.
56+
key_ranges = list(key_ranges)
57+
58+
per_file_requests: dict[str, list[tuple[int | None, int | None, int]]] = defaultdict(list)
59+
for idx, (path, range_) in enumerate(key_ranges):
60+
per_file_requests[path].append((range_[0], range_[1], idx))
61+
62+
futs: list[Coroutine[Any, Any, list[bytes]]] = []
63+
for path, ranges in per_file_requests.items():
64+
offsets = [r[0] for r in ranges]
65+
lengths = [r[1] - r[0] for r in ranges]
66+
fut = obs.get_ranges_async(self.store, path, offsets=offsets, lengths=lengths)
67+
futs.append(fut)
68+
69+
result = await asyncio.gather(*futs)
70+
71+
output_buffers: list[bytes] = [b""] * len(key_ranges)
72+
for per_file_request, buffers in zip(per_file_requests.items(), result, strict=True):
73+
path, ranges = per_file_request
74+
for buffer, ranges_ in zip(buffers, ranges, strict=True):
75+
initial_index = ranges_[2]
76+
output_buffers[initial_index] = buffer
77+
78+
return output_buffers
79+
80+
async def exists(self, key: str) -> bool:
81+
try:
82+
await obs.head_async(self.store, key)
83+
except FileNotFoundError:
84+
return False
85+
else:
86+
return True
87+
88+
@property
89+
def supports_writes(self) -> bool:
90+
return True
91+
92+
async def set(self, key: str, value: Buffer) -> None:
93+
buf = value.to_bytes()
94+
await obs.put_async(self.store, key, buf)
95+
96+
# TODO:
97+
# async def set_if_not_exists(self, key: str, value: Buffer) -> None:
98+
99+
@property
100+
def supports_deletes(self) -> bool:
101+
return True
102+
103+
async def delete(self, key: str) -> None:
104+
await obs.delete_async(self.store, key)
105+
106+
@property
107+
def supports_partial_writes(self) -> bool:
108+
return False
109+
110+
async def set_partial_values(
111+
self, key_start_values: Iterable[tuple[str, int, BytesLike]]
112+
) -> None:
113+
raise NotImplementedError
114+
115+
@property
116+
def supports_listing(self) -> bool:
117+
return True
118+
119+
def list(self) -> AsyncGenerator[str, None]:
120+
# object-store-rs does not yet support list results as an async generator
121+
# https://github.com/apache/arrow-rs/issues/6587
122+
objects = obs.list(self.store)
123+
paths = [object["path"] for object in objects]
124+
# Not sure how to convert list to async generator
125+
return paths
126+
127+
def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
128+
# object-store-rs does not yet support list results as an async generator
129+
# https://github.com/apache/arrow-rs/issues/6587
130+
objects = obs.list(self.store, prefix=prefix)
131+
paths = [object["path"] for object in objects]
132+
# Not sure how to convert list to async generator
133+
return paths
134+
135+
def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
136+
# object-store-rs does not yet support list results as an async generator
137+
# https://github.com/apache/arrow-rs/issues/6587
138+
objects = obs.list_with_delimiter(self.store, prefix=prefix)
139+
paths = [object["path"] for object in objects["objects"]]
140+
# Not sure how to convert list to async generator
141+
return paths

src/zarr/v3/store/object_store.py

Lines changed: 0 additions & 78 deletions
This file was deleted.

0 commit comments

Comments
 (0)