Skip to content

Commit bfb4397

Browse files
committed
Use async batched _rm() for FsspecStore.delete_dir()
1 parent 11ac3d9 commit bfb4397

File tree

1 file changed

+8
-32
lines changed

1 file changed

+8
-32
lines changed

src/zarr/storage/_fsspec.py

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import warnings
4+
import inspect
45
from typing import TYPE_CHECKING, Any
56

67
from zarr.abc.store import ByteRangeRequest, Store
@@ -262,47 +263,22 @@ async def delete(self, key: str) -> None:
262263
pass
263264

264265
async def delete_dir(self, prefix: str) -> None:
265-
"""
266-
Remove all keys and prefixes in the store that begin with a given prefix.
267-
"""
266+
# docstring inherited
268267
if not self.supports_deletes:
269-
raise NotImplementedError
268+
raise NotImplementedError('This method is only available for stores that support deletes.')
270269
if not self.supports_listing:
271-
raise NotImplementedError
270+
raise NotImplementedError('This method is only available for stores that support directory listing.')
272271
self._check_writable()
273272

274-
if prefix and not prefix.endswith("/"):
275-
prefix += "/"
276-
277-
paths_to_delete = []
278-
async for key in self.list_prefix(prefix):
279-
paths_to_delete.append(_dereference_path(self.path, key))
273+
path_to_delete = _dereference_path(self.path, prefix)
280274

281-
if not paths_to_delete:
282-
return
283-
284-
try:
285-
import s3fs
286-
except ImportError:
287-
s3fs = None
288-
289-
# If s3fs is installed and our filesystem is S3FileSystem, do a bulk delete
290-
if s3fs and isinstance(self.fs, s3fs.S3FileSystem):
275+
if hasattr(self.fs, "_rm") and inspect.iscoroutinefunction(self.fs._rm):
291276
try:
292-
await self.fs._rm(paths_to_delete)
293-
except FileNotFoundError:
294-
pass
277+
await self.fs._rm(path_to_delete, recursive=True)
295278
except self.allowed_exceptions:
296279
pass
297280
else:
298-
# Otherwise, delete one by one
299-
for path in paths_to_delete:
300-
try:
301-
await self.fs._rm(path)
302-
except FileNotFoundError:
303-
pass
304-
except self.allowed_exceptions:
305-
pass
281+
raise NotImplementedError("The store does not support async deletes")
306282

307283
async def exists(self, key: str) -> bool:
308284
# docstring inherited

0 commit comments

Comments
 (0)