Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions fsspec/implementations/asyn_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import functools
import inspect

from fsspec.asyn import AsyncFileSystem
from fsspec.asyn import AsyncFileSystem, running_async


def async_wrapper(func, obj=None):
Expand Down Expand Up @@ -42,10 +42,14 @@ class AsyncFileSystemWrapper(AsyncFileSystem):
The synchronous filesystem instance to wrap.
"""

def __init__(self, sync_fs, *args, **kwargs):
super().__init__(*args, **kwargs)
self.asynchronous = True
self.sync_fs = sync_fs
protocol = "async_wrapper"
cachable = False

def __init__(self, fs, *args, asynchronous=None, **kwargs):
if asynchronous is None:
asynchronous = running_async()
super().__init__(*args, asynchronous=asynchronous, **kwargs)
self.sync_fs = fs
self.protocol = self.sync_fs.protocol
self._wrap_all_sync_methods()

Expand Down
2 changes: 0 additions & 2 deletions fsspec/implementations/dirfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ def __init__(
super().__init__(**storage_options)
if fs is None:
fs = filesystem(protocol=target_protocol, **(target_options or {}))
if (path is not None) ^ (fo is not None) is False:
raise ValueError("Provide path or fo, not both")
path = path or fo

if self.asynchronous and not fs.async_impl:
Expand Down
21 changes: 20 additions & 1 deletion fsspec/implementations/tests/test_asyn_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@
from .test_local import csv_files, filetexts


def test_is_async():
@pytest.mark.asyncio
async def test_is_async_default():
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)
assert async_fs.async_impl
assert async_fs.asynchronous
async_fs = AsyncFileSystemWrapper(fs, asynchronous=False)
assert not async_fs.asynchronous


def test_class_wrapper():
Expand Down Expand Up @@ -53,6 +57,7 @@ async def test_cats():
assert result == b"a,b\n1,2\n"[1:-2]

# test synchronous API is available as expected
async_fs = AsyncFileSystemWrapper(fs, asynchronous=False)
result = async_fs.cat(".test.fakedata.1.csv", start=1, end=-2)
assert result == b"a,b\n1,2\n"[1:-2]

Expand Down Expand Up @@ -142,3 +147,17 @@ async def test_batch_operations():
await async_fs._rm([".test.fakedata.1.csv", ".test.fakedata.2.csv"])
assert not await async_fs._exists(".test.fakedata.1.csv")
assert not await async_fs._exists(".test.fakedata.2.csv")


def test_open(tmpdir):
fn = f"{tmpdir}/afile"
with open(fn, "wb") as f:
f.write(b"hello")
of = fsspec.open(
"dir://afile::async_wrapper::file",
mode="rb",
async_wrapper={"asynchronous": False},
dir={"path": str(tmpdir)},
)
with of as f:
assert f.read() == b"hello"
4 changes: 2 additions & 2 deletions fsspec/implementations/tests/test_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,10 @@ def test_fss_has_defaults(m):
assert fs.fss[None] is m

fs = fsspec.filesystem("reference", fo={"key": ["memory://a"]})
assert fs.fss[None] is fs.fss["memory"]
assert fs.fss[None] == fs.fss["memory"]

fs = fsspec.filesystem("reference", fo={"key": ["memory://a"], "blah": ["path"]})
assert fs.fss[None] is fs.fss["memory"]
assert fs.fss[None] == fs.fss["memory"]


def test_merging(m):
Expand Down
3 changes: 3 additions & 0 deletions fsspec/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def register_implementation(name, cls, clobber=False, errtxt=None):
"class": "fsspec.implementations.arrow.HadoopFileSystem",
"err": "pyarrow and local java libraries required for HDFS",
},
"async_wrapper": {
"class": "morefs.asyn_wrapper.AsyncWrapperFileSystem",
},
"asynclocal": {
"class": "morefs.asyn_local.AsyncLocalFileSystem",
"err": "Install 'morefs[asynclocalfs]' to use AsyncLocalFileSystem",
Expand Down
Loading