|
4 | 4 | import contextlib |
5 | 5 | import pickle |
6 | 6 | from collections import defaultdict |
7 | | -from typing import TYPE_CHECKING, TypedDict |
| 7 | +from typing import TYPE_CHECKING, TypedDict, TypeVar |
8 | 8 |
|
9 | 9 | from zarr.abc.store import ( |
10 | 10 | ByteRequest, |
|
16 | 16 | from zarr.core.config import config |
17 | 17 |
|
18 | 18 | if TYPE_CHECKING: |
19 | | - from collections.abc import AsyncGenerator, Coroutine, Iterable |
| 19 | + from collections.abc import AsyncGenerator, Coroutine, Iterable, Sequence |
20 | 20 | from typing import Any |
21 | 21 |
|
22 | 22 | from obstore import ListResult, ListStream, ObjectMeta, OffsetRange, SuffixRange |
|
27 | 27 |
|
28 | 28 | __all__ = ["ObjectStore"] |
29 | 29 |
|
| 30 | +T = TypeVar("T") |
| 31 | + |
30 | 32 | _ALLOWED_EXCEPTIONS: tuple[type[Exception], ...] = ( |
31 | 33 | FileNotFoundError, |
32 | 34 | IsADirectoryError, |
@@ -212,41 +214,55 @@ def supports_listing(self) -> bool: |
212 | 214 | # docstring inherited |
213 | 215 | return True |
214 | 216 |
|
215 | | - def list(self) -> AsyncGenerator[str, None]: |
216 | | - # docstring inherited |
| 217 | + def _list(self, prefix: str | None = None) -> AsyncGenerator[ObjectMeta, None]: |
217 | 218 | import obstore as obs |
218 | 219 |
|
219 | | - objects: ListStream[list[ObjectMeta]] = obs.list(self.store) |
220 | | - return _transform_list(objects) |
| 220 | + objects: ListStream[Sequence[ObjectMeta]] = obs.list(self.store, prefix=prefix) |
| 221 | + return (obj async for obj in _transform_list(objects)) |
221 | 222 |
|
222 | | - def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: |
| 223 | + def list(self) -> AsyncGenerator[str, None]: |
223 | 224 | # docstring inherited |
224 | | - import obstore as obs |
| 225 | + return (obj["path"] async for obj in self._list()) |
225 | 226 |
|
226 | | - objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix) |
227 | | - return _transform_list(objects) |
| 227 | + def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: |
| 228 | + # docstring inherited |
| 229 | + return (obj["path"] async for obj in self._list(prefix)) |
228 | 230 |
|
229 | 231 | def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: |
230 | 232 | # docstring inherited |
231 | 233 | import obstore as obs |
232 | 234 |
|
233 | | - coroutine = obs.list_with_delimiter_async(self.store, prefix=prefix) |
| 235 | + coroutine: Coroutine[Any, Any, ListResult[Sequence[ObjectMeta]]] = ( |
| 236 | + obs.list_with_delimiter_async(self.store, prefix=prefix) |
| 237 | + ) |
234 | 238 | return _transform_list_dir(coroutine, prefix) |
235 | 239 |
|
| 240 | + async def getsize(self, key: str) -> int: |
| 241 | + # docstring inherited |
| 242 | + import obstore as obs |
| 243 | + |
| 244 | + resp = await obs.head_async(self.store, key) |
| 245 | + return resp["size"] |
| 246 | + |
| 247 | + async def getsize_prefix(self, prefix: str) -> int: |
| 248 | + # docstring inherited |
| 249 | + sizes = [obj["size"] async for obj in self._list(prefix=prefix)] |
| 250 | + return sum(sizes) |
| 251 | + |
236 | 252 |
|
237 | 253 | async def _transform_list( |
238 | | - list_stream: ListStream[list[ObjectMeta]], |
239 | | -) -> AsyncGenerator[str, None]: |
| 254 | + list_stream: ListStream[Sequence[ObjectMeta]], |
| 255 | +) -> AsyncGenerator[ObjectMeta, None]: |
240 | 256 | """ |
241 | | - Transform the result of list into an async generator of paths. |
| 257 | + Transform the result of list into an async generator of ObjectMeta dicts. |
242 | 258 | """ |
243 | 259 | async for batch in list_stream: |
244 | 260 | for item in batch: |
245 | | - yield item["path"] |
| 261 | + yield item |
246 | 262 |
|
247 | 263 |
|
248 | 264 | async def _transform_list_dir( |
249 | | - list_result_coroutine: Coroutine[Any, Any, ListResult[list[ObjectMeta]]], prefix: str |
| 265 | + list_result_coroutine: Coroutine[Any, Any, ListResult[Sequence[ObjectMeta]]], prefix: str |
250 | 266 | ) -> AsyncGenerator[str, None]: |
251 | 267 | """ |
252 | 268 | Transform the result of list_with_delimiter into an async generator of paths. |
|
0 commit comments