Skip to content

Commit af2a39b

Browse files
committed
Handle list streams
1 parent f5c884b commit af2a39b

File tree

1 file changed

+34
-22
lines changed

1 file changed

+34
-22
lines changed

src/zarr/storage/object_store.py

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from collections.abc import Iterable
66
from typing import TYPE_CHECKING, Any
77

8-
import object_store_rs as obs
8+
import obstore as obs
99

1010
from zarr.abc.store import ByteRangeRequest, Store
1111
from zarr.core.buffer import Buffer
@@ -15,7 +15,8 @@
1515
from collections.abc import AsyncGenerator, Coroutine, Iterable
1616
from typing import Any
1717

18-
from object_store_rs.store import ObjectStore as _ObjectStore
18+
from obstore import ListStream, ObjectMeta
19+
from obstore.store import ObjectStore as _ObjectStore
1920

2021
from zarr.core.buffer import Buffer, BufferPrototype
2122
from zarr.core.common import BytesLike
@@ -93,8 +94,9 @@ async def set(self, key: str, value: Buffer) -> None:
9394
buf = value.to_bytes()
9495
await obs.put_async(self.store, key, buf)
9596

96-
# TODO:
97-
# async def set_if_not_exists(self, key: str, value: Buffer) -> None:
97+
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
98+
buf = value.to_bytes()
99+
await obs.put_async(self.store, key, buf, mode="create")
98100

99101
@property
100102
def supports_deletes(self) -> bool:
@@ -117,25 +119,35 @@ def supports_listing(self) -> bool:
117119
return True
118120

119121
def list(self) -> AsyncGenerator[str, None]:
120-
# object-store-rs does not yet support list results as an async generator
121-
# https://github.com/apache/arrow-rs/issues/6587
122-
objects = obs.list(self.store)
123-
paths = [object["path"] for object in objects]
124-
# Not sure how to convert list to async generator
125-
return paths
122+
objects: ListStream[list[ObjectMeta]] = obs.list(self.store)
123+
return _transform_list(objects)
126124

127125
def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
128-
# object-store-rs does not yet support list results as an async generator
129-
# https://github.com/apache/arrow-rs/issues/6587
130-
objects = obs.list(self.store, prefix=prefix)
131-
paths = [object["path"] for object in objects]
132-
# Not sure how to convert list to async generator
133-
return paths
126+
objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix)
127+
return _transform_list(objects)
134128

135129
def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
136-
# object-store-rs does not yet support list results as an async generator
137-
# https://github.com/apache/arrow-rs/issues/6587
138-
objects = obs.list_with_delimiter(self.store, prefix=prefix)
139-
paths = [object["path"] for object in objects["objects"]]
140-
# Not sure how to convert list to async generator
141-
return paths
130+
objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix)
131+
return _transform_list_dir(objects, prefix)
132+
133+
134+
async def _transform_list(
135+
list_stream: AsyncGenerator[list[ObjectMeta], None],
136+
) -> AsyncGenerator[str, None]:
137+
async for batch in list_stream:
138+
for item in batch:
139+
yield item["path"]
140+
141+
142+
async def _transform_list_dir(
143+
list_stream: AsyncGenerator[list[ObjectMeta], None], prefix: str
144+
) -> AsyncGenerator[str, None]:
145+
# We assume that the underlying object-store implementation correctly handles the
146+
# prefix, so we don't double-check that the returned results actually start with the
147+
# given prefix.
148+
prefix_len = len(prefix)
149+
async for batch in list_stream:
150+
for item in batch:
151+
# Yield this item if "/" does not exist after the prefix.
152+
if "/" not in item["path"][prefix_len:]:
153+
yield item["path"]

0 commit comments

Comments
 (0)