From 4f9c3b18b9f8c1db05c2dc99cf117cce6d5438c4 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Sat, 16 Nov 2024 16:21:04 +0100 Subject: [PATCH] [WIP] Add reduce for AbstractBufferedFile --- fsspec/implementations/http.py | 22 ---------------------- fsspec/spec.py | 23 +++++++++++++++++++++++ 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py index 47dfb88f9..61ec39436 100644 --- a/fsspec/implementations/http.py +++ b/fsspec/implementations/http.py @@ -692,25 +692,6 @@ async def async_fetch_range(self, start, end): _fetch_range = sync_wrapper(async_fetch_range) - def __reduce__(self): - return ( - reopen, - ( - self.fs, - self.url, - self.mode, - self.blocksize, - self.cache.name if self.cache else "none", - self.size, - ), - ) - - -def reopen(fs, url, mode, blocksize, cache_type, size=None): - return fs.open( - url, mode=mode, block_size=blocksize, cache_type=cache_type, size=size - ) - magic_check = re.compile("([*[])") @@ -760,9 +741,6 @@ def close(self): asyncio.run_coroutine_threadsafe(self._close(), self.loop) super().close() - def __reduce__(self): - return reopen, (self.fs, self.url, self.mode, self.blocksize, self.cache.name) - class AsyncStreamFile(AbstractAsyncStreamedFile): def __init__( diff --git a/fsspec/spec.py b/fsspec/spec.py index 9659f2e98..7e37e1edb 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -2049,6 +2049,22 @@ def seekable(self): def writable(self): """Whether opened for writing""" return self.mode in {"wb", "ab"} and not self.closed + + def __reduce__(self): + if self.mode != "rb": + raise RuntimeError("Pickling a writeable file is not supported") + + return reopen, ( + self.fs, + self.path, + self.mode, + self.blocksize, + self.loc, + self.size, + self.autocommit, + self.cache.name if self.cache else "none", + self.kwargs, + ) def __del__(self): if not self.closed: @@ -2064,3 +2080,10 @@ def __enter__(self): def __exit__(self, *args): self.close() + + +def reopen(fs, path, mode, blocksize, loc, size, autocommit, cache_type, kwargs): + file = fs.open(path, mode=mode, block_size=blocksize, autocommit=autocommit, cache_type=cache_type, size=size, **kwargs) + if loc > 0: + file.seek(loc) + return file