Skip to content

Commit 38d3cdb

Browse files
committed
Add async wrapper for sync FS
1 parent 9a16171 commit 38d3cdb

File tree

2 files changed

+211
-0
lines changed

2 files changed

+211
-0
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import asyncio
2+
import functools
3+
from fsspec.asyn import AsyncFileSystem
4+
5+
6+
def async_wrapper(func, obj=None):
7+
"""
8+
Wraps a synchronous function to make it awaitable.
9+
10+
Parameters
11+
----------
12+
func : callable
13+
The synchronous function to wrap.
14+
obj : object, optional
15+
The instance to bind the function to, if applicable.
16+
17+
Returns
18+
-------
19+
coroutine
20+
An awaitable version of the function.
21+
"""
22+
@functools.wraps(func)
23+
async def wrapper(*args, **kwargs):
24+
self = obj or args[0]
25+
return await asyncio.to_thread(func, *args, **kwargs)
26+
return wrapper
27+
28+
29+
class AsyncFileSystemWrapper(AsyncFileSystem):
30+
"""
31+
A wrapper class to convert a synchronous filesystem into an asynchronous one.
32+
33+
This class takes an existing synchronous filesystem implementation and wraps all
34+
its methods to provide an asynchronous interface.
35+
36+
Parameters
37+
----------
38+
sync_fs : AbstractFileSystem
39+
The synchronous filesystem instance to wrap.
40+
"""
41+
def __init__(self, sync_fs, *args, **kwargs):
42+
super().__init__(*args, **kwargs)
43+
self.fs = sync_fs
44+
self._wrap_all_sync_methods()
45+
46+
def _wrap_all_sync_methods(self):
47+
"""
48+
Wrap all synchronous methods of the underlying filesystem with asynchronous versions.
49+
"""
50+
for method_name in dir(self.fs):
51+
if method_name.startswith("_"):
52+
continue
53+
method = getattr(self.fs, method_name)
54+
if callable(method) and not asyncio.iscoroutinefunction(method):
55+
async_method = async_wrapper(method, obj=self)
56+
setattr(self, f"_{method_name}", async_method)
57+
58+
@classmethod
59+
def wrap_class(cls, sync_fs_class):
60+
"""
61+
Create a new class that can be used to instantiate an AsyncFileSystemWrapper
62+
with lazy instantiation of the underlying synchronous filesystem.
63+
64+
Parameters
65+
----------
66+
sync_fs_class : type
67+
The class of the synchronous filesystem to wrap.
68+
69+
Returns
70+
-------
71+
type
72+
A new class that wraps the provided synchronous filesystem class.
73+
"""
74+
class GeneratedAsyncFileSystemWrapper(cls):
75+
def __init__(self, *args, **kwargs):
76+
sync_fs = sync_fs_class(*args, **kwargs)
77+
super().__init__(sync_fs)
78+
79+
GeneratedAsyncFileSystemWrapper.__name__ = f"Async{sync_fs_class.__name__}Wrapper"
80+
return GeneratedAsyncFileSystemWrapper
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import asyncio
2+
import pytest
3+
import os
4+
5+
import fsspec
6+
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
7+
from fsspec.implementations.local import LocalFileSystem
8+
from .test_local import csv_files, filetexts
9+
10+
11+
def test_is_async():
12+
fs = fsspec.filesystem("file")
13+
async_fs = AsyncFileSystemWrapper(fs)
14+
assert async_fs.async_impl == True
15+
16+
17+
def test_class_wrapper():
18+
fs_cls = LocalFileSystem
19+
async_fs_cls = AsyncFileSystemWrapper.wrap_class(fs_cls)
20+
assert async_fs_cls.__name__ == "AsyncLocalFileSystemWrapper"
21+
async_fs = async_fs_cls()
22+
assert async_fs.async_impl == True
23+
24+
25+
@pytest.mark.asyncio
26+
async def test_cats():
27+
with filetexts(csv_files, mode="b"):
28+
fs = fsspec.filesystem("file")
29+
async_fs = AsyncFileSystemWrapper(fs)
30+
31+
result = await async_fs._cat(".test.fakedata.1.csv")
32+
assert result == b"a,b\n1,2\n"
33+
34+
out = set((await async_fs._cat([".test.fakedata.1.csv", ".test.fakedata.2.csv"])).values())
35+
assert out == {b"a,b\n1,2\n", b"a,b\n3,4\n"}
36+
37+
result = await async_fs._cat(".test.fakedata.1.csv", None, None)
38+
assert result == b"a,b\n1,2\n"
39+
40+
result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=6)
41+
assert result == b"a,b\n1,2\n"[1:6]
42+
43+
result = await async_fs._cat(".test.fakedata.1.csv", start=-1)
44+
assert result == b"a,b\n1,2\n"[-1:]
45+
46+
result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=-2)
47+
assert result == b"a,b\n1,2\n"[1:-2]
48+
49+
# test synchronous API is available as expected
50+
result = async_fs.cat(".test.fakedata.1.csv", start=1, end=-2)
51+
assert result == b"a,b\n1,2\n"[1:-2]
52+
53+
out = set(
54+
(await async_fs._cat(
55+
[".test.fakedata.1.csv", ".test.fakedata.2.csv"], start=1, end=-1
56+
)).values()
57+
)
58+
assert out == {b"a,b\n1,2\n"[1:-1], b"a,b\n3,4\n"[1:-1]}
59+
60+
@pytest.mark.asyncio
61+
async def test_basic_crud_operations():
62+
with filetexts(csv_files, mode="b"):
63+
fs = fsspec.filesystem("file")
64+
async_fs = AsyncFileSystemWrapper(fs)
65+
66+
await async_fs._touch(".test.fakedata.3.csv")
67+
assert await async_fs._exists(".test.fakedata.3.csv")
68+
69+
data = await async_fs._cat(".test.fakedata.1.csv")
70+
assert data == b"a,b\n1,2\n"
71+
72+
await async_fs._pipe(".test.fakedata.1.csv", b"a,b\n5,6\n")
73+
data = await async_fs._cat(".test.fakedata.1.csv")
74+
assert data == b"a,b\n5,6\n"
75+
76+
await async_fs._rm(".test.fakedata.1.csv")
77+
assert not await async_fs._exists(".test.fakedata.1.csv")
78+
79+
@pytest.mark.asyncio
80+
async def test_error_handling():
81+
fs = fsspec.filesystem("file")
82+
async_fs = AsyncFileSystemWrapper(fs)
83+
84+
with pytest.raises(FileNotFoundError):
85+
await async_fs._cat(".test.non_existent.csv")
86+
87+
with pytest.raises(FileNotFoundError):
88+
await async_fs._rm(".test.non_existent.csv")
89+
90+
@pytest.mark.asyncio
91+
async def test_concurrent_operations():
92+
with filetexts(csv_files, mode="b"):
93+
fs = fsspec.filesystem("file")
94+
async_fs = AsyncFileSystemWrapper(fs)
95+
96+
async def read_file(file_path):
97+
return await async_fs._cat(file_path)
98+
99+
results = await asyncio.gather(
100+
read_file(".test.fakedata.1.csv"),
101+
read_file(".test.fakedata.2.csv"),
102+
read_file(".test.fakedata.1.csv")
103+
)
104+
105+
assert results == [b"a,b\n1,2\n", b"a,b\n3,4\n", b"a,b\n1,2\n"]
106+
107+
@pytest.mark.asyncio
108+
async def test_directory_operations():
109+
with filetexts(csv_files, mode="b"):
110+
fs = fsspec.filesystem("file")
111+
async_fs = AsyncFileSystemWrapper(fs)
112+
113+
await async_fs._makedirs("new_directory")
114+
assert await async_fs._isdir("new_directory")
115+
116+
files = await async_fs._ls(".")
117+
filenames = [os.path.basename(file) for file in files]
118+
119+
assert ".test.fakedata.1.csv" in filenames
120+
assert ".test.fakedata.2.csv" in filenames
121+
assert "new_directory" in filenames
122+
123+
@pytest.mark.asyncio
124+
async def test_batch_operations():
125+
with filetexts(csv_files, mode="b"):
126+
fs = fsspec.filesystem("file")
127+
async_fs = AsyncFileSystemWrapper(fs)
128+
129+
await async_fs._rm([".test.fakedata.1.csv", ".test.fakedata.2.csv"])
130+
assert not await async_fs._exists(".test.fakedata.1.csv")
131+
assert not await async_fs._exists(".test.fakedata.2.csv")

0 commit comments

Comments
 (0)