Skip to content

Commit 0eed18f

Browse files
authored
refactor(referencefs): only override ReferenceFileSystem._open for fsspec < 2025.2.0 (#884)
1 parent 245204f commit 0eed18f

File tree

4 files changed

+24
-11
lines changed

4 files changed

+24
-11
lines changed

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ dependencies = [
2424
"tqdm",
2525
"numpy>=1,<3",
2626
"pandas>=2.0.0",
27+
"packaging",
2728
"pyarrow",
2829
"typing-extensions",
2930
"python-dateutil>=2",
3031
"attrs>=21.3.0",
32+
"fsspec>=2024.2.0",
3133
"s3fs>=2024.2.0",
3234
"gcsfs>=2024.2.0",
3335
"adlfs>=2024.2.0",

src/datachain/fs/__init__.py

Whitespace-only changes.

src/datachain/fs/reference.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import fsspec
2+
from packaging.version import Version, parse
3+
4+
# fsspec==2025.2.0 added support for a proper `open()` in `ReferenceFileSystem`.
5+
# Remove this module when `fsspec` minimum version requirement can be bumped.
6+
if parse(fsspec.__version__) < Version("2025.2.0"):
7+
from fsspec.core import split_protocol
8+
from fsspec.implementations import reference
9+
10+
class ReferenceFileSystem(reference.ReferenceFileSystem):
11+
def _open(self, path, mode="rb", *args, **kwargs):
12+
# overriding because `fsspec`'s `ReferenceFileSystem._open`
13+
# reads the whole file in-memory.
14+
(uri,) = self.references[path]
15+
protocol, _ = split_protocol(uri)
16+
return self.fss[protocol].open(uri, mode, *args, **kwargs)
17+
else:
18+
from fsspec.implementations.reference import ReferenceFileSystem # type: ignore[no-redef] # noqa: I001
19+
20+
21+
__all__ = ["ReferenceFileSystem"]

src/datachain/lib/arrow.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
from itertools import islice
33
from typing import TYPE_CHECKING, Any, Optional
44

5-
import fsspec.implementations.reference
65
import orjson
76
import pyarrow as pa
8-
from fsspec.core import split_protocol
97
from pyarrow.dataset import CsvFileFormat, dataset
108
from tqdm.auto import tqdm
119

10+
from datachain.fs.reference import ReferenceFileSystem
1211
from datachain.lib.data_model import dict_to_data_model
1312
from datachain.lib.file import ArrowRow, File
1413
from datachain.lib.model_store import ModelStore
@@ -27,15 +26,6 @@
2726
DATACHAIN_SIGNAL_SCHEMA_PARQUET_KEY = b"DataChain SignalSchema"
2827

2928

30-
class ReferenceFileSystem(fsspec.implementations.reference.ReferenceFileSystem):
31-
def _open(self, path, mode="rb", *args, **kwargs):
32-
# overriding because `fsspec`'s `ReferenceFileSystem._open`
33-
# reads the whole file in-memory.
34-
(uri,) = self.references[path]
35-
protocol, _ = split_protocol(uri)
36-
return self.fss[protocol].open(uri, mode, *args, **kwargs)
37-
38-
3929
class ArrowGenerator(Generator):
4030
DEFAULT_BATCH_SIZE = 2**17 # same as `pyarrow._dataset._DEFAULT_BATCH_SIZE`
4131

0 commit comments

Comments
 (0)