From b81e8f7aaa8fd9215bf671d05b3f66eddfdc31f4 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 18 Feb 2025 15:06:28 -0500 Subject: [PATCH 1/4] fix reference CI --- fsspec/implementations/reference.py | 6 +- .../implementations/tests/test_reference.py | 82 ++++++++++++++----- 2 files changed, 64 insertions(+), 24 deletions(-) diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index 9cf529b23..e0b6d0cbe 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -22,9 +22,10 @@ from fsspec.callbacks import DEFAULT_CALLBACK from fsspec.core import filesystem, open, split_protocol from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper -from fsspec.utils import isfilelike, merge_offset_ranges, other_paths +from fsspec.utils import isfilelike, merge_offset_ranges, other_paths, setup_logging logger = logging.getLogger("fsspec.reference") +setup_logging(logger_name="fsspec.reference") class ReferenceNotReachable(RuntimeError): @@ -140,13 +141,13 @@ def __init__( self.root = root self.chunk_sizes = {} - self.out_root = out_root or self.root self.cat_thresh = categorical_threshold self.engine = engine self.cache_size = cache_size self.url = self.root + "/{field}/refs.{record}.parq" # TODO: derive fs from `root` self.fs = fsspec.filesystem("file") if fs is None else fs + self.out_root = self.fs.unstrip_protocol(out_root or self.root) from importlib.util import find_spec @@ -498,7 +499,6 @@ def write(self, field, record, base_url=None, storage_options=None): } else: raise NotImplementedError(f"{self.engine} not supported") - df.to_parquet( fn, engine=self.engine, diff --git a/fsspec/implementations/tests/test_reference.py b/fsspec/implementations/tests/test_reference.py index 9f02e77af..8de58839b 100644 --- a/fsspec/implementations/tests/test_reference.py +++ b/fsspec/implementations/tests/test_reference.py @@ -489,9 +489,9 @@ def test_missing_nonasync(m): } refs = {".zarray": json.dumps(zarray)} - m = fsspec.get_mapper("reference://", fo=refs, remote_protocol="memory") - - a = zarr.open_array(m) + a = zarr.open_array( + "reference://", storage_options={"fo": refs, "remote_protocol": "memory"} + ) assert str(a[0]) == "nan" @@ -800,9 +800,15 @@ def test_cached(m, tmpdir): @pytest.fixture() def lazy_refs(m): zarr = pytest.importorskip("zarr") - l = LazyReferenceMapper.create("memory://refs", fs=m) - g = zarr.open(l, mode="w") + l = LazyReferenceMapper.create("memory://refs.parquet", fs=m) + g = zarr.open( + "reference://", + storage_options={"fo": "memory://refs.parquet", "remote_options": "memory"}, + zarr_format=2, + mode="w", + ) g.create_dataset(name="data", shape=(100,), chunks=(10,), dtype="int64") + g.store.fs.references.flush() return l @@ -814,7 +820,7 @@ def test_append_parquet(lazy_refs, m): assert lazy_refs["data/0"] == b"data" lazy_refs.flush() - lazy2 = LazyReferenceMapper("memory://refs", fs=m) + lazy2 = LazyReferenceMapper("memory://refs.parquet", fs=m) assert lazy2["data/0"] == b"data" with pytest.raises(KeyError): lazy_refs["data/1"] @@ -822,7 +828,7 @@ def test_append_parquet(lazy_refs, m): assert lazy2["data/1"] == b"Bdata" lazy2.flush() - lazy2 = LazyReferenceMapper("memory://refs", fs=m) + lazy2 = LazyReferenceMapper("memory://refs.parquet", fs=m) assert lazy2["data/0"] == b"data" assert lazy2["data/1"] == b"Bdata" lazy2["data/1"] = b"Adata" @@ -831,7 +837,7 @@ def test_append_parquet(lazy_refs, m): assert "data/0" not in lazy2 lazy2.flush() - lazy2 = LazyReferenceMapper("memory://refs", fs=m) + lazy2 = LazyReferenceMapper("memory://refs.parquet", fs=m) with pytest.raises(KeyError): lazy2["data/0"] assert lazy2["data/1"] == b"Adata" @@ -847,45 +853,71 @@ def test_deep_parq(m, engine): fs=m, engine=engine, ) - g = zarr.open_group(lz, mode="w") + g = zarr.open_group( + "reference://", + mode="w", + storage_options={"fo": "memory://out.parq", "remote_protocol": "memory"}, + zarr_version=2, + ) g2 = g.create_group("instant") - g2.create_dataset(name="one", data=[1, 2, 3]) + arr = g2.create_dataset(name="one", shape=(3,), dtype="int64") + arr[:] = [1, 2, 3] + g.store.fs.references.flush() lz.flush() lz = fsspec.implementations.reference.LazyReferenceMapper( "memory://out.parq", fs=m, engine=engine ) - g = zarr.open_group(lz) - assert g.instant.one[:].tolist() == [1, 2, 3] - assert sorted(_["name"] for _ in lz.ls("")) == [".zgroup", ".zmetadata", "instant"] + g = zarr.open_group( + "reference://", + storage_options={"fo": "memory://out.parq", "remote_protocol": "memory"}, + zarr_version=2, + ) + assert g["instant"]["one"][:].tolist() == [1, 2, 3] + assert sorted(_["name"] for _ in lz.ls("")) == [ + ".zattrs", + ".zgroup", + ".zmetadata", + "instant", + ] assert sorted(_["name"] for _ in lz.ls("instant")) == [ + "instant/.zattrs", "instant/.zgroup", "instant/one", ] assert sorted(_["name"] for _ in lz.ls("instant/one")) == [ "instant/one/.zarray", + "instant/one/.zattrs", "instant/one/0", ] def test_parquet_no_data(m): zarr = pytest.importorskip("zarr") - lz = fsspec.implementations.reference.LazyReferenceMapper.create( + fsspec.implementations.reference.LazyReferenceMapper.create( "memory://out.parq", fs=m ) - - g = zarr.open_group(lz, mode="w") + g = zarr.open_group( + "reference://", + storage_options={ + "fo": "memory://out.parq", + "fs": m, + "remote_protocol": "memory", + }, + zarr_format=2, + mode="w", + ) arr = g.create_dataset( name="one", dtype="int32", shape=(10,), chunks=(5,), - compression=None, + compressor=None, fill_value=1, ) - lz.flush() + g.store.fs.references.flush() assert (arr[:] == 1).all() @@ -896,16 +928,24 @@ def test_parquet_no_references(m): "memory://out.parq", fs=m ) - g = zarr.open_group(lz, mode="w") + g = zarr.open_group( + "reference://", + storage_options={ + "fo": "memory://out.parq", + "fs": m, + "remote_protocol": "memory", + }, + zarr_format=2, + mode="w", + ) arr = g.create_dataset( name="one", dtype="int32", shape=(), chunks=(), - compression=None, + compressor=None, fill_value=1, ) lz.flush() - arr[...] assert arr[...].tolist() == 1 # scalar, equal to fill value From d453213ac2069027631c132005da50ec149d4bbf Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 18 Feb 2025 15:20:22 -0500 Subject: [PATCH 2/4] maybe it's still v2 --- fsspec/implementations/tests/test_reference.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/fsspec/implementations/tests/test_reference.py b/fsspec/implementations/tests/test_reference.py index 8de58839b..028a22a40 100644 --- a/fsspec/implementations/tests/test_reference.py +++ b/fsspec/implementations/tests/test_reference.py @@ -843,10 +843,19 @@ def test_append_parquet(lazy_refs, m): assert lazy2["data/1"] == b"Adata" +def skip_zarr_2(): + import zarr + from packaging.version import parse + + if parse(zarr.__version__) < parse("3.0"): + pytest.skip("Zarr 3 required") + + @pytest.mark.parametrize("engine", ["fastparquet", "pyarrow"]) def test_deep_parq(m, engine): pytest.importorskip("kerchunk") zarr = pytest.importorskip("zarr") + skip_zarr_2() lz = fsspec.implementations.reference.LazyReferenceMapper.create( "memory://out.parq", @@ -896,6 +905,7 @@ def test_deep_parq(m, engine): def test_parquet_no_data(m): zarr = pytest.importorskip("zarr") + skip_zarr_2() fsspec.implementations.reference.LazyReferenceMapper.create( "memory://out.parq", fs=m ) @@ -924,6 +934,7 @@ def test_parquet_no_data(m): def test_parquet_no_references(m): zarr = pytest.importorskip("zarr") + skip_zarr_2() lz = fsspec.implementations.reference.LazyReferenceMapper.create( "memory://out.parq", fs=m ) From d632462e8391074eaaf4b4202f5c9245830e520a Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 18 Feb 2025 15:30:30 -0500 Subject: [PATCH 3/4] one more --- fsspec/implementations/tests/test_reference.py | 1 + 1 file changed, 1 insertion(+) diff --git a/fsspec/implementations/tests/test_reference.py b/fsspec/implementations/tests/test_reference.py index 028a22a40..3d3a6140b 100644 --- a/fsspec/implementations/tests/test_reference.py +++ b/fsspec/implementations/tests/test_reference.py @@ -800,6 +800,7 @@ def test_cached(m, tmpdir): @pytest.fixture() def lazy_refs(m): zarr = pytest.importorskip("zarr") + skip_zarr_2() l = LazyReferenceMapper.create("memory://refs.parquet", fs=m) g = zarr.open( "reference://", From dc72b5feb1e64ce76e3b0d034975162ff796b279 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 18 Feb 2025 15:31:02 -0500 Subject: [PATCH 4/4] remove logging --- fsspec/implementations/reference.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index e0b6d0cbe..26c67a563 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -22,10 +22,9 @@ from fsspec.callbacks import DEFAULT_CALLBACK from fsspec.core import filesystem, open, split_protocol from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper -from fsspec.utils import isfilelike, merge_offset_ranges, other_paths, setup_logging +from fsspec.utils import isfilelike, merge_offset_ranges, other_paths logger = logging.getLogger("fsspec.reference") -setup_logging(logger_name="fsspec.reference") class ReferenceNotReachable(RuntimeError):