Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ def with_mode(self, mode: AccessModeLiteral) -> Self:

Parameters
----------
mode: AccessModeLiteral
mode : AccessModeLiteral
The new mode to use.

Returns
-------
store:
store
A new store of the same type with the new mode.

Examples
Expand Down
64 changes: 56 additions & 8 deletions src/zarr/storage/remote.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any, Self

import fsspec
Expand Down Expand Up @@ -42,15 +43,45 @@ def __init__(
allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
) -> None:
"""
A remote Store based on FSSpec

Parameters
----------
url: root of the datastore. In fsspec notation, this is usually like "protocol://path/to".
Can also be a upath.UPath instance/
allowed_exceptions: when fetching data, these cases will be deemed to correspond to missing
keys, rather than some other IO failure
storage_options: passed on to fsspec to make the filesystem instance. If url is a UPath,
this must not be used.

fs : AsyncFileSystem
The Async FSSpec filesystem to use with this store.
mode : AccessModeLiteral
The access mode to use.
path : str
The root path of the store. This should be a relative path and must not include the
filesystem scheme.
allowed_exceptions : tuple[type[Exception], ...]
When fetching data, these cases will be deemed to correspond to missing keys.

Attributes
----------
fs
allowed_exceptions
supports_writes
supports_deletes
supports_partial_writes
supports_listing

Raises
------
TypeError
If the Filesystem does not support async operations.
ValueError
If the path argument includes a scheme.

Warns
-----
UserWarning
If the file system (fs) was not created with `asynchronous=True`.

See Also
--------
RemoteStore.from_upath
RemoteStore.from_url
"""
super().__init__(mode=mode)
self.fs = fs
Expand All @@ -59,6 +90,14 @@ def __init__(

if not self.fs.async_impl:
raise TypeError("Filesystem needs to support async operations.")
if not self.fs.asynchronous:
warnings.warn(
f"fs ({fs}) was not created with `asynchronous=True`, this may lead to surprising behavior",
stacklevel=2,
)
if "://" in path:
scheme, _ = path.split("://", maxsplit=1)
raise ValueError(f"path argument to RemoteStore must not include scheme ({scheme}://)")

@classmethod
def from_upath(
Expand All @@ -82,7 +121,16 @@ def from_url(
mode: AccessModeLiteral = "r",
allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
) -> RemoteStore:
fs, path = fsspec.url_to_fs(url, **storage_options)
opts = storage_options or {}
opts = {"asynchronous": True, **opts}

fs, path = fsspec.url_to_fs(url, **opts)

# fsspec is not consistent about removing the scheme from the path, so check and strip it here
# https://github.com/fsspec/filesystem_spec/issues/1722
if "://" in path:
_, path = path.split("://", maxsplit=1)

return cls(fs=fs, path=path, mode=mode, allowed_exceptions=allowed_exceptions)

async def clear(self) -> None:
Expand Down
5 changes: 1 addition & 4 deletions tests/v3/test_store/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ async def test_make_store_path(tmpdir: str) -> None:


async def test_make_store_path_fsspec(monkeypatch) -> None:
import fsspec.implementations.memory

monkeypatch.setattr(fsspec.implementations.memory.MemoryFileSystem, "async_impl", True)
store_path = await make_store_path("memory://")
store_path = await make_store_path("http://foo.com/bar")
assert isinstance(store_path.store, RemoteStore)


Expand Down
35 changes: 29 additions & 6 deletions tests/v3/test_store/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ def s3(s3_base: None) -> Generator[s3fs.S3FileSystem, None, None]:

async def test_basic() -> None:
store = RemoteStore.from_url(
f"s3://{test_bucket_name}",
f"s3://{test_bucket_name}/foo/spam/",
mode="w",
storage_options={"endpoint_url": endpoint_url, "anon": False},
)
assert store.fs.asynchronous
assert store.path == f"{test_bucket_name}/foo/spam"
assert await _collect_aiterator(store.list()) == ()
assert not await store.exists("foo")
data = b"hello"
Expand All @@ -109,7 +111,7 @@ class TestRemoteStoreS3(StoreTests[RemoteStore, cpu.Buffer]):
@pytest.fixture
def store_kwargs(self, request) -> dict[str, str | bool]:
fs, path = fsspec.url_to_fs(
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=True
)
return {"fs": fs, "path": path, "mode": "r+"}

Expand Down Expand Up @@ -143,9 +145,7 @@ def test_store_supports_partial_writes(self, store: RemoteStore) -> None:
def test_store_supports_listing(self, store: RemoteStore) -> None:
assert store.supports_listing

async def test_remote_store_from_uri(
self, store: RemoteStore, store_kwargs: dict[str, str | bool]
):
async def test_remote_store_from_uri(self, store: RemoteStore):
storage_options = {
"endpoint_url": endpoint_url,
"anon": False,
Expand Down Expand Up @@ -183,6 +183,29 @@ async def test_remote_store_from_uri(
assert dict(group.attrs) == {"key": "value-3"}

def test_from_upath(self) -> None:
path = UPath(f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False)
path = UPath(
f"s3://{test_bucket_name}/foo/bar/",
endpoint_url=endpoint_url,
anon=False,
asynchronous=True,
)
result = RemoteStore.from_upath(path)
assert result.fs.endpoint_url == endpoint_url
assert result.fs.asynchronous
assert result.path == f"{test_bucket_name}/foo/bar"

def test_init_raises_if_path_has_scheme(self, store_kwargs) -> None:
# regression test for https://github.com/zarr-developers/zarr-python/issues/2342
store_kwargs["path"] = "s3://" + store_kwargs["path"]
with pytest.raises(
ValueError, match="path argument to RemoteStore must not include scheme .*"
):
self.store_cls(**store_kwargs)

def test_init_warns_if_fs_asynchronous_is_false(self) -> None:
fs, path = fsspec.url_to_fs(
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=False
)
store_kwargs = {"fs": fs, "path": path, "mode": "r+"}
with pytest.warns(UserWarning, match=r".* was not created with `asynchronous=True`.*"):
self.store_cls(**store_kwargs)