Skip to content

Commit 656a565

Browse files
committed
WIP: Support fsspec mutable mapping objects in zarr.open
1 parent 4582998 commit 656a565

File tree

4 files changed

+100
-12
lines changed

4 files changed

+100
-12
lines changed

src/zarr/storage/_common.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,27 @@ async def make_store_path(
312312
# By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate.
313313
store = await MemoryStore.open(store_dict=store_like, read_only=_read_only)
314314
else:
315-
msg = f"Unsupported type for store_like: '{type(store_like).__name__}'" # type: ignore[unreachable]
316-
raise TypeError(msg)
315+
try: # type: ignore[unreachable]
316+
import fsspec
317+
318+
if isinstance(store_like, fsspec.mapping.FSMap):
319+
if path:
320+
raise TypeError(
321+
"'path' was provided but is not used for FSMap store_like objects"
322+
)
323+
if storage_options:
324+
raise TypeError(
325+
"'storage_options was provided but is not used for FSMap store_like objects"
326+
)
327+
store = FsspecStore.from_mapper(store_like, read_only=_read_only)
328+
else:
329+
raise (
330+
TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'")
331+
)
332+
except ImportError:
333+
raise (
334+
TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'")
335+
) from None
317336

318337
result = await StorePath.open(store, path=path_normalized, mode=mode)
319338

src/zarr/storage/_fsspec.py

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
if TYPE_CHECKING:
1717
from collections.abc import AsyncIterator, Iterable
1818

19+
from fsspec import AbstractFileSystem
1920
from fsspec.asyn import AsyncFileSystem
21+
from fsspec.mapping import FSMap
2022

2123
from zarr.core.buffer import BufferPrototype
2224
from zarr.core.common import BytesLike
@@ -29,6 +31,20 @@
2931
)
3032

3133

34+
def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem:
35+
try:
36+
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
37+
38+
fs = AsyncFileSystemWrapper(fs)
39+
except ImportError as e:
40+
raise ImportError(
41+
f"The filesystem '{fs}' is synchronous, and the required "
42+
"AsyncFileSystemWrapper is not available. Upgrade fsspec to version "
43+
"2024.12.0 or later to enable this functionality."
44+
) from e
45+
return fs
46+
47+
3248
class FsspecStore(Store):
3349
"""
3450
A remote Store based on FSSpec
@@ -136,6 +152,37 @@ def from_upath(
136152
allowed_exceptions=allowed_exceptions,
137153
)
138154

155+
@classmethod
156+
def from_mapper(
157+
cls,
158+
fs_map: FSMap,
159+
read_only: bool = False,
160+
allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
161+
) -> FsspecStore:
162+
"""
163+
Create a FsspecStore from an upath object.
164+
165+
Parameters
166+
----------
167+
read_only : bool
168+
Whether the store is read-only, defaults to False.
169+
allowed_exceptions : tuple, optional
170+
The exceptions that are allowed to be raised when accessing the
171+
store. Defaults to ALLOWED_EXCEPTIONS.
172+
173+
Returns
174+
-------
175+
FsspecStore
176+
"""
177+
if not fs_map.fs.async_impl or not fs_map.fs.asynchronous:
178+
raise TypeError("Filesystem needs to support async operations.")
179+
return cls(
180+
fs=fs_map.fs,
181+
path=fs_map.root,
182+
read_only=read_only,
183+
allowed_exceptions=allowed_exceptions,
184+
)
185+
139186
@classmethod
140187
def from_url(
141188
cls,
@@ -174,16 +221,7 @@ def from_url(
174221

175222
fs, path = url_to_fs(url, **opts)
176223
if not fs.async_impl:
177-
try:
178-
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
179-
180-
fs = AsyncFileSystemWrapper(fs)
181-
except ImportError as e:
182-
raise ImportError(
183-
f"The filesystem for URL '{url}' is synchronous, and the required "
184-
"AsyncFileSystemWrapper is not available. Upgrade fsspec to version "
185-
"2024.12.0 or later to enable this functionality."
186-
) from e
224+
fs = _make_async(fs)
187225

188226
# fsspec is not consistent about removing the scheme from the path, so check and strip it here
189227
# https://github.com/fsspec/filesystem_spec/issues/1722

tests/test_api.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,23 @@ def test_open_with_mode_w_minus(tmp_path: pathlib.Path) -> None:
288288
zarr.open(store=tmp_path, mode="w-")
289289

290290

291+
@pytest.mark.xfail(
292+
reason="Automatic sync -> async filesystems not implemented yet for FSMap objects."
293+
)
294+
def test_open_fsmap_file(tmp_path: pathlib.Path) -> None:
295+
fsspec = pytest.importorskip("fsspec")
296+
fs = fsspec.filesystem("file")
297+
mapper = fs.get_mapper(tmp_path)
298+
arr = zarr.open(store=mapper, mode="w", shape=(3, 3))
299+
assert isinstance(arr, Array)
300+
301+
arr[...] = 3
302+
z2 = zarr.open(store=mapper, mode="w", shape=(3, 3))
303+
assert isinstance(z2, Array)
304+
assert not (z2[:] == 3).all()
305+
z2[:] = 3
306+
307+
291308
@pytest.mark.parametrize("zarr_format", [2, 3])
292309
def test_array_order(zarr_format: ZarrFormat) -> None:
293310
arr = zarr.ones(shape=(2, 2), order=None, zarr_format=zarr_format)

tests/test_store/test_fsspec.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from packaging.version import parse as parse_version
1010

1111
import zarr.api.asynchronous
12+
from zarr import Array
1213
from zarr.abc.store import OffsetByteRequest
1314
from zarr.core.buffer import Buffer, cpu, default_buffer_prototype
1415
from zarr.core.sync import _collect_aiterator, sync
@@ -104,6 +105,19 @@ async def test_basic() -> None:
104105
assert out[0].to_bytes() == data[1:]
105106

106107

108+
def test_open_s3map() -> None:
109+
s3_filesystem = s3fs.S3FileSystem(asynchronous=True, endpoint_url=endpoint_url, anon=False)
110+
mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/")
111+
arr = zarr.open(store=mapper, mode="w", shape=(3, 3))
112+
assert isinstance(arr, Array)
113+
114+
arr[...] = 3
115+
z2 = zarr.open(store=mapper, mode="w", shape=(3, 3))
116+
assert isinstance(z2, Array)
117+
assert not (z2[:] == 3).all()
118+
z2[:] = 3
119+
120+
107121
class TestFsspecStoreS3(StoreTests[FsspecStore, cpu.Buffer]):
108122
store_cls = FsspecStore
109123
buffer_cls = cpu.Buffer

0 commit comments

Comments
 (0)