Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/guide/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ that implements the `AbstractFileSystem` API,
.. code-block:: python

>>> import zarr
>>> store = zarr.storage.RemoteStore("gs://foo/bar", mode="r")
>>> store = zarr.storage.RemoteStore.from_url("gs://foo/bar", mode="r")
>>> zarr.open(store=store)
<Array <RemoteStore(GCSFileSystem, foo/bar)> shape=(10, 20) dtype=float32>

Expand Down
4 changes: 2 additions & 2 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@ def with_mode(self, mode: AccessModeLiteral) -> Self:

Parameters
----------
mode: AccessModeLiteral
mode : AccessModeLiteral
The new mode to use.

Returns
-------
store:
store
A new store of the same type with the new mode.

Examples
Expand Down
54 changes: 47 additions & 7 deletions src/zarr/storage/remote.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any, Self

import fsspec
Expand Down Expand Up @@ -35,7 +36,8 @@ class RemoteStore(Store):
mode : AccessModeLiteral
The access mode to use.
path : str
The root path of the store.
The root path of the store. This should be a relative path and must not include the
filesystem scheme.
allowed_exceptions : tuple[type[Exception], ...]
When fetching data, these cases will be deemed to correspond to missing keys.

Expand All @@ -47,6 +49,23 @@ class RemoteStore(Store):
supports_deletes
supports_partial_writes
supports_listing

Raises
------
TypeError
If the Filesystem does not support async operations.
ValueError
If the path argument includes a scheme.

Warns
-----
UserWarning
If the file system (fs) was not created with `asynchronous=True`.

See Also
--------
RemoteStore.from_upath
RemoteStore.from_url
"""

# based on FSSpec
Expand All @@ -72,6 +91,15 @@ def __init__(

if not self.fs.async_impl:
raise TypeError("Filesystem needs to support async operations.")
if not self.fs.asynchronous:
warnings.warn(
f"fs ({fs}) was not created with `asynchronous=True`, this may lead to surprising behavior",
stacklevel=2,
)
if "://" in path and not path.startswith("http"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could consider also "::", since that is used in chained fsspec URLs, but should not normally end up passed to the FS. However, it is not as special as "://", so I'm ok not to make a special case for it.

# `not path.startswith("http")` is a special case for the http filesystem (¯\_(ツ)_/¯)
scheme, _ = path.split("://", maxsplit=1)
raise ValueError(f"path argument to RemoteStore must not include scheme ({scheme}://)")

@classmethod
def from_upath(
Expand Down Expand Up @@ -131,13 +159,23 @@ def from_url(
-------
RemoteStore
"""
fs, path = fsspec.url_to_fs(url, **storage_options)
opts = storage_options or {}
opts = {"asynchronous": True, "use_listings_cache": False, **opts}

fs, path = fsspec.url_to_fs(url, **opts)

# fsspec is not consistent about removing the scheme from the path, so check and strip it here
# https://github.com/fsspec/filesystem_spec/issues/1722
if "://" in path and not path.startswith("http"):
# `not path.startswith("http")` is a special case for the http filesystem (¯\_(ツ)_/¯)
_, path = path.split("://", maxsplit=1)

return cls(fs=fs, path=path, mode=mode, allowed_exceptions=allowed_exceptions)

async def clear(self) -> None:
# docstring inherited
try:
for subpath in await self.fs._find(self.path, withdirs=True):
for subpath in await self.fs._find(self.path, withdirs=True, refresh=True):
if subpath != self.path:
await self.fs._rm(subpath, recursive=True)
except FileNotFoundError:
Expand All @@ -149,7 +187,7 @@ async def empty(self) -> bool:
# TODO: it would be nice if we didn't have to list all keys here
# it should be possible to stop after the first key is discovered
try:
return not await self.fs._ls(self.path)
return not await self.fs._ls(self.path, refresh=True)
except FileNotFoundError:
return True

Expand Down Expand Up @@ -283,15 +321,15 @@ async def set_partial_values(

async def list(self) -> AsyncGenerator[str, None]:
# docstring inherited
allfiles = await self.fs._find(self.path, detail=False, withdirs=False)
allfiles = await self.fs._find(self.path, detail=False, withdirs=False, refresh=True)
for onefile in (a.replace(self.path + "/", "") for a in allfiles):
yield onefile

async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
# docstring inherited
prefix = f"{self.path}/{prefix.rstrip('/')}"
try:
allfiles = await self.fs._ls(prefix, detail=False)
allfiles = await self.fs._ls(prefix, detail=False, refresh=True)
except FileNotFoundError:
return
for onefile in (a.replace(prefix + "/", "") for a in allfiles):
Expand All @@ -300,5 +338,7 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
# docstring inherited
find_str = f"{self.path}/{prefix}"
for onefile in await self.fs._find(find_str, detail=False, maxdepth=None, withdirs=False):
for onefile in await self.fs._find(
find_str, detail=False, maxdepth=None, withdirs=False, refresh=True
):
yield onefile.removeprefix(find_str)
5 changes: 1 addition & 4 deletions tests/v3/test_store/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ async def test_make_store_path(tmpdir: str) -> None:


async def test_make_store_path_fsspec(monkeypatch) -> None:
import fsspec.implementations.memory

monkeypatch.setattr(fsspec.implementations.memory.MemoryFileSystem, "async_impl", True)
store_path = await make_store_path("memory://")
store_path = await make_store_path("http://foo.com/bar")
assert isinstance(store_path.store, RemoteStore)


Expand Down
35 changes: 29 additions & 6 deletions tests/v3/test_store/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ def s3(s3_base: None) -> Generator[s3fs.S3FileSystem, None, None]:

async def test_basic() -> None:
store = RemoteStore.from_url(
f"s3://{test_bucket_name}",
f"s3://{test_bucket_name}/foo/spam/",
mode="w",
storage_options={"endpoint_url": endpoint_url, "anon": False},
)
assert store.fs.asynchronous
assert store.path == f"{test_bucket_name}/foo/spam"
assert await _collect_aiterator(store.list()) == ()
assert not await store.exists("foo")
data = b"hello"
Expand All @@ -109,7 +111,7 @@ class TestRemoteStoreS3(StoreTests[RemoteStore, cpu.Buffer]):
@pytest.fixture
def store_kwargs(self, request) -> dict[str, str | bool]:
fs, path = fsspec.url_to_fs(
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=True
)
return {"fs": fs, "path": path, "mode": "r+"}

Expand Down Expand Up @@ -143,9 +145,7 @@ def test_store_supports_partial_writes(self, store: RemoteStore) -> None:
def test_store_supports_listing(self, store: RemoteStore) -> None:
assert store.supports_listing

async def test_remote_store_from_uri(
self, store: RemoteStore, store_kwargs: dict[str, str | bool]
):
async def test_remote_store_from_uri(self, store: RemoteStore):
storage_options = {
"endpoint_url": endpoint_url,
"anon": False,
Expand Down Expand Up @@ -183,9 +183,32 @@ async def test_remote_store_from_uri(
assert dict(group.attrs) == {"key": "value-3"}

def test_from_upath(self) -> None:
path = UPath(f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False)
path = UPath(
f"s3://{test_bucket_name}/foo/bar/",
endpoint_url=endpoint_url,
anon=False,
asynchronous=True,
)
result = RemoteStore.from_upath(path)
assert result.fs.endpoint_url == endpoint_url
assert result.fs.asynchronous
assert result.path == f"{test_bucket_name}/foo/bar"

def test_init_raises_if_path_has_scheme(self, store_kwargs) -> None:
# regression test for https://github.com/zarr-developers/zarr-python/issues/2342
store_kwargs["path"] = "s3://" + store_kwargs["path"]
with pytest.raises(
ValueError, match="path argument to RemoteStore must not include scheme .*"
):
self.store_cls(**store_kwargs)

def test_init_warns_if_fs_asynchronous_is_false(self) -> None:
fs, path = fsspec.url_to_fs(
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=False
)
store_kwargs = {"fs": fs, "path": path, "mode": "r+"}
with pytest.warns(UserWarning, match=r".* was not created with `asynchronous=True`.*"):
self.store_cls(**store_kwargs)

async def test_empty_nonexistent_path(self, store_kwargs) -> None:
# regression test for https://github.com/zarr-developers/zarr-python/pull/2343
Expand Down