Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@ def __init__(

self.root = root
self.chunk_sizes = {}
self.out_root = out_root or self.root
self.cat_thresh = categorical_threshold
self.engine = engine
self.cache_size = cache_size
self.url = self.root + "/{field}/refs.{record}.parq"
# TODO: derive fs from `root`
self.fs = fsspec.filesystem("file") if fs is None else fs
self.out_root = self.fs.unstrip_protocol(out_root or self.root)

from importlib.util import find_spec

Expand Down Expand Up @@ -498,7 +498,6 @@ def write(self, field, record, base_url=None, storage_options=None):
}
else:
raise NotImplementedError(f"{self.engine} not supported")

df.to_parquet(
fn,
engine=self.engine,
Expand Down
94 changes: 73 additions & 21 deletions fsspec/implementations/tests/test_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,9 @@ def test_missing_nonasync(m):
}
refs = {".zarray": json.dumps(zarray)}

m = fsspec.get_mapper("reference://", fo=refs, remote_protocol="memory")

a = zarr.open_array(m)
a = zarr.open_array(
"reference://", storage_options={"fo": refs, "remote_protocol": "memory"}
)
assert str(a[0]) == "nan"


Expand Down Expand Up @@ -800,9 +800,16 @@ def test_cached(m, tmpdir):
@pytest.fixture()
def lazy_refs(m):
zarr = pytest.importorskip("zarr")
l = LazyReferenceMapper.create("memory://refs", fs=m)
g = zarr.open(l, mode="w")
skip_zarr_2()
l = LazyReferenceMapper.create("memory://refs.parquet", fs=m)
g = zarr.open(
"reference://",
storage_options={"fo": "memory://refs.parquet", "remote_options": "memory"},
zarr_format=2,
mode="w",
)
g.create_dataset(name="data", shape=(100,), chunks=(10,), dtype="int64")
g.store.fs.references.flush()
return l


Expand All @@ -814,15 +821,15 @@ def test_append_parquet(lazy_refs, m):
assert lazy_refs["data/0"] == b"data"
lazy_refs.flush()

lazy2 = LazyReferenceMapper("memory://refs", fs=m)
lazy2 = LazyReferenceMapper("memory://refs.parquet", fs=m)
assert lazy2["data/0"] == b"data"
with pytest.raises(KeyError):
lazy_refs["data/1"]
lazy2["data/1"] = b"Bdata"
assert lazy2["data/1"] == b"Bdata"
lazy2.flush()

lazy2 = LazyReferenceMapper("memory://refs", fs=m)
lazy2 = LazyReferenceMapper("memory://refs.parquet", fs=m)
assert lazy2["data/0"] == b"data"
assert lazy2["data/1"] == b"Bdata"
lazy2["data/1"] = b"Adata"
Expand All @@ -831,81 +838,126 @@ def test_append_parquet(lazy_refs, m):
assert "data/0" not in lazy2
lazy2.flush()

lazy2 = LazyReferenceMapper("memory://refs", fs=m)
lazy2 = LazyReferenceMapper("memory://refs.parquet", fs=m)
with pytest.raises(KeyError):
lazy2["data/0"]
assert lazy2["data/1"] == b"Adata"


def skip_zarr_2():
import zarr
from packaging.version import parse

if parse(zarr.__version__) < parse("3.0"):
pytest.skip("Zarr 3 required")


@pytest.mark.parametrize("engine", ["fastparquet", "pyarrow"])
def test_deep_parq(m, engine):
pytest.importorskip("kerchunk")
zarr = pytest.importorskip("zarr")
skip_zarr_2()

lz = fsspec.implementations.reference.LazyReferenceMapper.create(
"memory://out.parq",
fs=m,
engine=engine,
)
g = zarr.open_group(lz, mode="w")
g = zarr.open_group(
"reference://",
mode="w",
storage_options={"fo": "memory://out.parq", "remote_protocol": "memory"},
zarr_version=2,
)

g2 = g.create_group("instant")
g2.create_dataset(name="one", data=[1, 2, 3])
arr = g2.create_dataset(name="one", shape=(3,), dtype="int64")
arr[:] = [1, 2, 3]
g.store.fs.references.flush()
lz.flush()

lz = fsspec.implementations.reference.LazyReferenceMapper(
"memory://out.parq", fs=m, engine=engine
)
g = zarr.open_group(lz)
assert g.instant.one[:].tolist() == [1, 2, 3]
assert sorted(_["name"] for _ in lz.ls("")) == [".zgroup", ".zmetadata", "instant"]
g = zarr.open_group(
"reference://",
storage_options={"fo": "memory://out.parq", "remote_protocol": "memory"},
zarr_version=2,
)
assert g["instant"]["one"][:].tolist() == [1, 2, 3]
assert sorted(_["name"] for _ in lz.ls("")) == [
".zattrs",
".zgroup",
".zmetadata",
"instant",
]
assert sorted(_["name"] for _ in lz.ls("instant")) == [
"instant/.zattrs",
"instant/.zgroup",
"instant/one",
]

assert sorted(_["name"] for _ in lz.ls("instant/one")) == [
"instant/one/.zarray",
"instant/one/.zattrs",
"instant/one/0",
]


def test_parquet_no_data(m):
zarr = pytest.importorskip("zarr")
lz = fsspec.implementations.reference.LazyReferenceMapper.create(
skip_zarr_2()
fsspec.implementations.reference.LazyReferenceMapper.create(
"memory://out.parq", fs=m
)

g = zarr.open_group(lz, mode="w")
g = zarr.open_group(
"reference://",
storage_options={
"fo": "memory://out.parq",
"fs": m,
"remote_protocol": "memory",
},
zarr_format=2,
mode="w",
)
arr = g.create_dataset(
name="one",
dtype="int32",
shape=(10,),
chunks=(5,),
compression=None,
compressor=None,
fill_value=1,
)
lz.flush()
g.store.fs.references.flush()

assert (arr[:] == 1).all()


def test_parquet_no_references(m):
zarr = pytest.importorskip("zarr")
skip_zarr_2()
lz = fsspec.implementations.reference.LazyReferenceMapper.create(
"memory://out.parq", fs=m
)

g = zarr.open_group(lz, mode="w")
g = zarr.open_group(
"reference://",
storage_options={
"fo": "memory://out.parq",
"fs": m,
"remote_protocol": "memory",
},
zarr_format=2,
mode="w",
)
arr = g.create_dataset(
name="one",
dtype="int32",
shape=(),
chunks=(),
compression=None,
compressor=None,
fill_value=1,
)
lz.flush()
arr[...]

assert arr[...].tolist() == 1 # scalar, equal to fill value
Loading