From 4f9c3b18b9f8c1db05c2dc99cf117cce6d5438c4 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Sat, 16 Nov 2024 16:21:04 +0100 Subject: [PATCH 01/10] [WIP] Add reduce for AbstractBufferedFile --- fsspec/implementations/http.py | 22 ---------------------- fsspec/spec.py | 23 +++++++++++++++++++++++ 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py index 47dfb88f9..61ec39436 100644 --- a/fsspec/implementations/http.py +++ b/fsspec/implementations/http.py @@ -692,25 +692,6 @@ async def async_fetch_range(self, start, end): _fetch_range = sync_wrapper(async_fetch_range) - def __reduce__(self): - return ( - reopen, - ( - self.fs, - self.url, - self.mode, - self.blocksize, - self.cache.name if self.cache else "none", - self.size, - ), - ) - - -def reopen(fs, url, mode, blocksize, cache_type, size=None): - return fs.open( - url, mode=mode, block_size=blocksize, cache_type=cache_type, size=size - ) - magic_check = re.compile("([*[])") @@ -760,9 +741,6 @@ def close(self): asyncio.run_coroutine_threadsafe(self._close(), self.loop) super().close() - def __reduce__(self): - return reopen, (self.fs, self.url, self.mode, self.blocksize, self.cache.name) - class AsyncStreamFile(AbstractAsyncStreamedFile): def __init__( diff --git a/fsspec/spec.py b/fsspec/spec.py index 9659f2e98..7e37e1edb 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -2049,6 +2049,22 @@ def seekable(self): def writable(self): """Whether opened for writing""" return self.mode in {"wb", "ab"} and not self.closed + + def __reduce__(self): + if self.mode != "rb": + raise RuntimeError("Pickling a writeable file is not supported") + + return reopen, ( + self.fs, + self.path, + self.mode, + self.blocksize, + self.loc, + self.size, + self.autocommit, + self.cache.name if self.cache else "none", + self.kwargs, + ) def __del__(self): if not self.closed: @@ -2064,3 +2080,10 @@ def __enter__(self): def __exit__(self, *args): self.close() + + +def reopen(fs, path, mode, blocksize, loc, size, autocommit, cache_type, kwargs): + file = fs.open(path, mode=mode, block_size=blocksize, autocommit=autocommit, cache_type=cache_type, size=size, **kwargs) + if loc > 0: + file.seek(loc) + return file From b971911c6a844f2edae20b58265fa2d427581560 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 18 Nov 2024 14:22:38 -0600 Subject: [PATCH 02/10] Lint --- fsspec/spec.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/fsspec/spec.py b/fsspec/spec.py index 7e37e1edb..ea819bcf1 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -2049,20 +2049,20 @@ def seekable(self): def writable(self): """Whether opened for writing""" return self.mode in {"wb", "ab"} and not self.closed - + def __reduce__(self): if self.mode != "rb": raise RuntimeError("Pickling a writeable file is not supported") - + return reopen, ( - self.fs, + self.fs, self.path, - self.mode, - self.blocksize, - self.loc, - self.size, - self.autocommit, - self.cache.name if self.cache else "none", + self.mode, + self.blocksize, + self.loc, + self.size, + self.autocommit, + self.cache.name if self.cache else "none", self.kwargs, ) @@ -2083,7 +2083,15 @@ def __exit__(self, *args): def reopen(fs, path, mode, blocksize, loc, size, autocommit, cache_type, kwargs): - file = fs.open(path, mode=mode, block_size=blocksize, autocommit=autocommit, cache_type=cache_type, size=size, **kwargs) + file = fs.open( + path, + mode=mode, + block_size=blocksize, + autocommit=autocommit, + cache_type=cache_type, + size=size, + **kwargs, + ) if loc > 0: file.seek(loc) return file From ea45a3f51d771b6bb06e26fb9b4604d9c3163eb3 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 21 Nov 2024 10:01:11 -0600 Subject: [PATCH 03/10] Add test --- fsspec/tests/test_spec.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/fsspec/tests/test_spec.py b/fsspec/tests/test_spec.py index 3927c6550..49601ae6a 100644 --- a/fsspec/tests/test_spec.py +++ b/fsspec/tests/test_spec.py @@ -16,6 +16,8 @@ from fsspec.implementations.local import LocalFileSystem from fsspec.spec import AbstractBufferedFile, AbstractFileSystem +from fsspec.tests.conftest import data, server + PATHS_FOR_GLOB_TESTS = ( {"name": "test0.json", "type": "file", "size": 100}, {"name": "test0.yaml", "type": "file", "size": 100}, @@ -744,6 +746,26 @@ def test_cache(): assert len(DummyTestFS._cache) == 0 +def test_cache_not_pickled(server): + fs = fsspec.filesystem("http") + filepath = server.realfile + length = 3 + f = fs.open(filepath, mode="rb") + assert not f.cache.cache # No cache initially + assert f.read(length=length) == data[:length] + assert f.cache.cache == data # Cache is populated + + # Roundtrip through pickle + import pickle + + f2 = pickle.loads(pickle.dumps(f)) + assert not f2.cache.cache # No cache initially + assert ( + f2.read(length=length) == data[length : 2 * length] + ) # Read file from previous seek point + assert f2.cache.cache == data[length:] # Cache is populated + + def test_current(): fs = DummyTestFS() fs2 = DummyTestFS(arg=1) From b8a76ead47b63888e37a0c88d32db6f454ab1f8f Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 21 Nov 2024 10:01:32 -0600 Subject: [PATCH 04/10] Lint --- fsspec/tests/test_spec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fsspec/tests/test_spec.py b/fsspec/tests/test_spec.py index 49601ae6a..84ccad22e 100644 --- a/fsspec/tests/test_spec.py +++ b/fsspec/tests/test_spec.py @@ -16,7 +16,7 @@ from fsspec.implementations.local import LocalFileSystem from fsspec.spec import AbstractBufferedFile, AbstractFileSystem -from fsspec.tests.conftest import data, server +from fsspec.tests.conftest import data PATHS_FOR_GLOB_TESTS = ( {"name": "test0.json", "type": "file", "size": 100}, From f90e0fa9c52151fec48abb41f941e54ae90e12da Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 21 Nov 2024 10:52:25 -0600 Subject: [PATCH 05/10] Cache http --- fsspec/tests/test_spec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fsspec/tests/test_spec.py b/fsspec/tests/test_spec.py index 84ccad22e..dc679e32a 100644 --- a/fsspec/tests/test_spec.py +++ b/fsspec/tests/test_spec.py @@ -747,7 +747,7 @@ def test_cache(): def test_cache_not_pickled(server): - fs = fsspec.filesystem("http") + fs = fsspec.filesystem("http", cache_type="readahead") filepath = server.realfile length = 3 f = fs.open(filepath, mode="rb") From d402ac5b22a639a42a1b166215f96b49098e5a35 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 21 Nov 2024 10:59:12 -0600 Subject: [PATCH 06/10] Try a different way to cache --- fsspec/tests/test_spec.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fsspec/tests/test_spec.py b/fsspec/tests/test_spec.py index dc679e32a..5db806e45 100644 --- a/fsspec/tests/test_spec.py +++ b/fsspec/tests/test_spec.py @@ -747,7 +747,8 @@ def test_cache(): def test_cache_not_pickled(server): - fs = fsspec.filesystem("http", cache_type="readahead") + # fs = fsspec.filesystem("http", cache_type="readahead") + fs = fsspec.filesystem("simplecache", target_protocol="http") filepath = server.realfile length = 3 f = fs.open(filepath, mode="rb") From d97e87a9b7aa4bfaed00c8e9958685236802a858 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 21 Nov 2024 11:17:17 -0600 Subject: [PATCH 07/10] Another --- fsspec/tests/test_spec.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fsspec/tests/test_spec.py b/fsspec/tests/test_spec.py index 5db806e45..42a557f2c 100644 --- a/fsspec/tests/test_spec.py +++ b/fsspec/tests/test_spec.py @@ -747,11 +747,11 @@ def test_cache(): def test_cache_not_pickled(server): - # fs = fsspec.filesystem("http", cache_type="readahead") - fs = fsspec.filesystem("simplecache", target_protocol="http") + fs = fsspec.filesystem("http") + # fs = fsspec.filesystem("readahead", target_protocol="http") filepath = server.realfile length = 3 - f = fs.open(filepath, mode="rb") + f = fs.open(filepath, mode="rb", cache_type="readahead") assert not f.cache.cache # No cache initially assert f.read(length=length) == data[:length] assert f.cache.cache == data # Cache is populated From 095054042a46ef3ab58eff81ed66e1401e33f068 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 21 Nov 2024 11:27:12 -0600 Subject: [PATCH 08/10] More --- .github/workflows/main.yaml | 2 +- fsspec/tests/test_spec.py | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 653eb8084..18a6fb41e 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -40,7 +40,7 @@ jobs: shell: bash -l {0} run: | pip install -e .[test_full] - pytest -v + pytest -v fsspec/tests/test_spec.py win: name: pytest-win diff --git a/fsspec/tests/test_spec.py b/fsspec/tests/test_spec.py index 42a557f2c..798e162b2 100644 --- a/fsspec/tests/test_spec.py +++ b/fsspec/tests/test_spec.py @@ -747,11 +747,15 @@ def test_cache(): def test_cache_not_pickled(server): - fs = fsspec.filesystem("http") + fs = fsspec.filesystem( + "http", + cache_type="readahead", + headers={"give_length": "true", "head_ok": "true"}, + ) # fs = fsspec.filesystem("readahead", target_protocol="http") filepath = server.realfile length = 3 - f = fs.open(filepath, mode="rb", cache_type="readahead") + f = fs.open(filepath, mode="rb") assert not f.cache.cache # No cache initially assert f.read(length=length) == data[:length] assert f.cache.cache == data # Cache is populated From 937a5895ebf7aafe49f856a84bc195b338bece3d Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 21 Nov 2024 11:31:10 -0600 Subject: [PATCH 09/10] Revert temp changes --- .github/workflows/main.yaml | 2 +- fsspec/tests/test_spec.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 18a6fb41e..653eb8084 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -40,7 +40,7 @@ jobs: shell: bash -l {0} run: | pip install -e .[test_full] - pytest -v fsspec/tests/test_spec.py + pytest -v win: name: pytest-win diff --git a/fsspec/tests/test_spec.py b/fsspec/tests/test_spec.py index 798e162b2..3a6d5c433 100644 --- a/fsspec/tests/test_spec.py +++ b/fsspec/tests/test_spec.py @@ -752,10 +752,10 @@ def test_cache_not_pickled(server): cache_type="readahead", headers={"give_length": "true", "head_ok": "true"}, ) - # fs = fsspec.filesystem("readahead", target_protocol="http") filepath = server.realfile length = 3 f = fs.open(filepath, mode="rb") + assert isinstance(f, AbstractBufferedFile) assert not f.cache.cache # No cache initially assert f.read(length=length) == data[:length] assert f.cache.cache == data # Cache is populated From 140fd9e8fab507415cc4f4c359abbf4dd440162f Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 21 Nov 2024 11:49:15 -0600 Subject: [PATCH 10/10] Lint --- fsspec/tests/test_spec.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fsspec/tests/test_spec.py b/fsspec/tests/test_spec.py index 3a6d5c433..742f53e06 100644 --- a/fsspec/tests/test_spec.py +++ b/fsspec/tests/test_spec.py @@ -15,7 +15,6 @@ from fsspec.implementations.http import HTTPFileSystem from fsspec.implementations.local import LocalFileSystem from fsspec.spec import AbstractBufferedFile, AbstractFileSystem - from fsspec.tests.conftest import data PATHS_FOR_GLOB_TESTS = (