-
Notifications
You must be signed in to change notification settings - Fork 19
Add Kvikio backend entrypoint #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9deadb7
aa2dc91
7fb4b94
743fe7d
5d501e4
facf5f7
f3f5189
9c98d19
dd8bc57
d2da1e4
b87c3c2
87cb74e
d7394ef
1b23fef
ca0cf45
97260d6
5d27b26
85491d7
c470b97
95efa18
d684dad
ae2a7f1
15fbafd
f3df115
4e1857a
7345b61
e2b410e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
from . import _version | ||
from .accessors import CupyDataArrayAccessor, CupyDatasetAccessor # noqa | ||
from .accessors import CupyDataArrayAccessor, CupyDatasetAccessor # noqa: F401 | ||
from .kvikio import KvikioBackendEntrypoint # noqa: F401 | ||
|
||
__version__ = _version.get_versions()["version"] |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,230 @@ | ||||||
""" | ||||||
:doc:`kvikIO <kvikio:index>` backend for xarray to read Zarr stores directly into CuPy | ||||||
arrays in GPU memory. | ||||||
""" | ||||||
|
||||||
import os | ||||||
import warnings | ||||||
|
||||||
import cupy as cp | ||||||
import numpy as np | ||||||
from xarray import Variable | ||||||
from xarray.backends import zarr as zarr_backend | ||||||
from xarray.backends.common import _normalize_path # TODO: can this be public | ||||||
from xarray.backends.store import StoreBackendEntrypoint | ||||||
from xarray.backends.zarr import ZarrArrayWrapper, ZarrBackendEntrypoint, ZarrStore | ||||||
from xarray.core import indexing | ||||||
from xarray.core.utils import close_on_error # TODO: can this be public. | ||||||
|
||||||
try: | ||||||
import kvikio.zarr | ||||||
import zarr | ||||||
|
||||||
has_kvikio = True | ||||||
except ImportError: | ||||||
has_kvikio = False | ||||||
|
||||||
|
||||||
# TODO: minimum kvikio version for supporting consolidated | ||||||
# TODO: minimum xarray version for ZarrArrayWrapper._array 2023.10.0? | ||||||
|
||||||
|
||||||
class DummyZarrArrayWrapper(ZarrArrayWrapper): | ||||||
def __init__(self, array: np.ndarray): | ||||||
assert isinstance(array, np.ndarray) | ||||||
self._array = array | ||||||
self.filters = None | ||||||
self.dtype = array.dtype | ||||||
self.shape = array.shape | ||||||
|
||||||
def __array__(self): | ||||||
return self._array | ||||||
|
||||||
def get_array(self): | ||||||
return self._array | ||||||
|
||||||
def __getitem__(self, key): | ||||||
return self._array[key] | ||||||
|
||||||
|
||||||
class CupyZarrArrayWrapper(ZarrArrayWrapper): | ||||||
def __array__(self): | ||||||
return self.get_array() | ||||||
|
||||||
|
||||||
class EagerCupyZarrArrayWrapper(ZarrArrayWrapper): | ||||||
"""Used to wrap dimension coordinates.""" | ||||||
|
||||||
def __array__(self): | ||||||
return self._array[:].get() | ||||||
|
||||||
def get_duck_array(self): | ||||||
# total hack: make a numpy array look like a Zarr array | ||||||
# this gets us through Xarray's backend layers | ||||||
return DummyZarrArrayWrapper(self._array[:].get()) | ||||||
|
||||||
|
||||||
class GDSZarrStore(ZarrStore): | ||||||
@classmethod | ||||||
def open_group( | ||||||
cls, | ||||||
store, | ||||||
mode="r", | ||||||
synchronizer=None, | ||||||
group=None, | ||||||
consolidated=False, | ||||||
consolidate_on_close=False, | ||||||
chunk_store=None, | ||||||
storage_options=None, | ||||||
append_dim=None, | ||||||
write_region=None, | ||||||
safe_chunks=True, | ||||||
stacklevel=2, | ||||||
): | ||||||
# zarr doesn't support pathlib.Path objects yet. zarr-python#601 | ||||||
if isinstance(store, os.PathLike): | ||||||
store = os.fspath(store) | ||||||
|
||||||
open_kwargs = { | ||||||
"mode": mode, | ||||||
"synchronizer": synchronizer, | ||||||
"path": group, | ||||||
########## NEW STUFF | ||||||
"meta_array": cp.empty(()), | ||||||
} | ||||||
open_kwargs["storage_options"] = storage_options | ||||||
|
||||||
if chunk_store: | ||||||
open_kwargs["chunk_store"] = chunk_store | ||||||
if consolidated is None: | ||||||
consolidated = False | ||||||
|
||||||
store = kvikio.zarr.GDSStore(store) | ||||||
|
||||||
if consolidated is None: | ||||||
try: | ||||||
zarr_group = zarr.open_consolidated(store, **open_kwargs) | ||||||
except KeyError: | ||||||
warnings.warn( | ||||||
"Failed to open Zarr store with consolidated metadata, " | ||||||
"falling back to try reading non-consolidated metadata. " | ||||||
"This is typically much slower for opening a dataset. " | ||||||
"To silence this warning, consider:\n" | ||||||
"1. Consolidating metadata in this existing store with " | ||||||
"zarr.consolidate_metadata().\n" | ||||||
"2. Explicitly setting consolidated=False, to avoid trying " | ||||||
"to read consolidate metadata, or\n" | ||||||
"3. Explicitly setting consolidated=True, to raise an " | ||||||
"error in this case instead of falling back to try " | ||||||
"reading non-consolidated metadata.", | ||||||
RuntimeWarning, | ||||||
stacklevel=stacklevel, | ||||||
) | ||||||
zarr_group = zarr.open_group(store, **open_kwargs) | ||||||
elif consolidated: | ||||||
# TODO: an option to pass the metadata_key keyword | ||||||
zarr_group = zarr.open_consolidated(store, **open_kwargs) | ||||||
else: | ||||||
zarr_group = zarr.open_group(store, **open_kwargs) | ||||||
|
||||||
return cls( | ||||||
zarr_group, | ||||||
mode, | ||||||
consolidate_on_close, | ||||||
append_dim, | ||||||
write_region, | ||||||
safe_chunks, | ||||||
) | ||||||
|
||||||
def open_store_variable(self, name, zarr_array): | ||||||
try_nczarr = self._mode == "r" | ||||||
dimensions, attributes = zarr_backend._get_zarr_dims_and_attrs( | ||||||
zarr_array, zarr_backend.DIMENSION_KEY, try_nczarr | ||||||
) | ||||||
|
||||||
#### Changed from zarr array wrapper | ||||||
# we want indexed dimensions to be loaded eagerly | ||||||
# Right now we load in to device and then transfer to host | ||||||
# But these should be small-ish arrays | ||||||
# TODO: can we tell GDSStore to load as numpy array directly | ||||||
# not cupy array? | ||||||
array_wrapper = EagerCupyZarrArrayWrapper if name in dimensions else CupyZarrArrayWrapper | ||||||
data = indexing.LazilyIndexedArray(array_wrapper(zarr_array)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently stuck on this line, and trying to figure out what to change in relation to pydata/xarray#6874. My guess is that in an ideal world where we've solved all the numpy/cupy duck typing issues, we could just use the recently modified
Suggested change
However, that leads to this error: ____________________________________________________________________________________ test_lazy_load[True] ____________________________________________________________________________________
consolidated = True, store = PosixPath('/tmp/pytest-of-weiji/pytest-32/test_lazy_load_True_0/kvikio.zarr')
@pytest.mark.parametrize("consolidated", [True, False])
def test_lazy_load(consolidated, store):
> with xr.open_dataset(store, engine="kvikio", consolidated=consolidated) as ds:
cupy_xarray/tests/test_kvikio.py:37:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/backends/api.py:571: in open_dataset
backend_ds = backend.open_dataset(
cupy_xarray/kvikio.py:241: in open_dataset
ds = store_entrypoint.open_dataset(
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/backends/store.py:58: in open_dataset
ds = Dataset(vars, attrs=attrs)
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/core/dataset.py:711: in __init__
variables, coord_names, dims, indexes, _ = merge_data_and_coords(
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/core/dataset.py:425: in merge_data_and_coords
return merge_core(
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/core/merge.py:699: in merge_core
collected = collect_variables_and_indexes(aligned, indexes=indexes)
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/core/merge.py:362: in collect_variables_and_indexes
idx, idx_vars = create_default_index_implicit(variable)
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/core/indexes.py:1404: in create_default_index_implicit
index = PandasIndex.from_variables(dim_var, options={})
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/core/indexes.py:654: in from_variables
obj = cls(data, dim, coord_dtype=var.dtype)
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/core/indexes.py:589: in __init__
index = safe_cast_to_index(array)
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/core/indexes.py:469: in safe_cast_to_index
index = pd.Index(np.asarray(array), **kwargs)
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/core/indexing.py:509: in __array__
return np.asarray(self.get_duck_array(), dtype=dtype)
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/backends/common.py:181: in get_duck_array
return self[key] # type: ignore [index]
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/backends/zarr.py:104: in __getitem__
return indexing.explicit_indexing_adapter(
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/core/indexing.py:1017: in explicit_indexing_adapter
indexable = NumpyIndexingAdapter(result)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'NumpyIndexingAdapter' object has no attribute 'array'") raised in repr()] NumpyIndexingAdapter object at 0x7f71beb14a00>
array = array([-5, -4, -3, -2, -1, 0, 1, 2, 3, 4])
def __init__(self, array):
# In NumpyIndexingAdapter we only allow to store bare np.ndarray
if not isinstance(array, np.ndarray):
> raise TypeError(
"NumpyIndexingAdapter only wraps np.ndarray. "
f"Trying to wrap {type(array)}"
)
E TypeError: NumpyIndexingAdapter only wraps np.ndarray. Trying to wrap <class 'cupy.ndarray'>
../../../mambaforge/envs/cupy-xarray-doc/lib/python3.10/site-packages/xarray/core/indexing.py:1493: TypeError
================================================================================== short test summary info ===================================================================================
FAILED cupy_xarray/tests/test_kvikio.py::test_lazy_load[True] - TypeError: NumpyIndexingAdapter only wraps np.ndarray. Trying to wrap <class 'cupy.ndarray'>
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Looked into this a bit and got lost in all the xarray class inheritance logic and mixins... @dcherian, could you point out what's the recommended route forward? Is there something we still need to upstream into xarray? I'm testing this on Output of
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm also facing a problem at the same place in the code when it comes to dimension coordinates. Consider the dummy dataset:
that I write to a zarr file:
which is ok to read back with:
But when I want to use the
it fails with:
Any idea? Just a bit of context: I'm working on a large radio astronomy project called RADIOBLOCKS where I'm willing to process very large Dask Xarray Datasets on a GPU cluster, hence my interest for your work on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The core issue here is that the "eager" wrapper isn't working as expected. Dimension coordinates must be eagerly loaded in to CPU memory, and this is a bit hacky at the moment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @weiji14 I think pydata/xarray#8408 fixes your issue. It needs a test though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tried @dcherian's suggestion, with
but now it fails with:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @orliac What xarray version do you use? I think this might be a relevant to this PR: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be fixed with this introduction by @andersy005 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @negin513 |
||||||
|
||||||
attributes = dict(attributes) | ||||||
encoding = { | ||||||
"chunks": zarr_array.chunks, | ||||||
"preferred_chunks": dict(zip(dimensions, zarr_array.chunks)), | ||||||
"compressor": zarr_array.compressor, | ||||||
"filters": zarr_array.filters, | ||||||
} | ||||||
# _FillValue needs to be in attributes, not encoding, so it will get | ||||||
# picked up by decode_cf | ||||||
if zarr_array.fill_value is not None: | ||||||
attributes["_FillValue"] = zarr_array.fill_value | ||||||
|
||||||
return Variable(dimensions, data, attributes, encoding) | ||||||
|
||||||
|
||||||
class KvikioBackendEntrypoint(ZarrBackendEntrypoint): | ||||||
""" | ||||||
Xarray backend to read Zarr stores using 'kvikio' engine. | ||||||
|
||||||
For more information about the underlying library, visit | ||||||
:doc:`kvikIO's Zarr page<kvikio:zarr>`. | ||||||
""" | ||||||
|
||||||
available = has_kvikio | ||||||
description = "Open zarr files (.zarr) using Kvikio" | ||||||
url = "https://docs.rapids.ai/api/kvikio/stable/api/#zarr" | ||||||
|
||||||
# disabled by default | ||||||
# We need to provide this because of the subclassing from | ||||||
# ZarrBackendEntrypoint | ||||||
def guess_can_open(self, filename_or_obj): | ||||||
return False | ||||||
|
||||||
def open_dataset( | ||||||
self, | ||||||
filename_or_obj, | ||||||
mask_and_scale=True, | ||||||
decode_times=True, | ||||||
concat_characters=True, | ||||||
decode_coords=True, | ||||||
drop_variables=None, | ||||||
use_cftime=None, | ||||||
decode_timedelta=None, | ||||||
group=None, | ||||||
mode="r", | ||||||
synchronizer=None, | ||||||
consolidated=None, | ||||||
chunk_store=None, | ||||||
storage_options=None, | ||||||
stacklevel=3, | ||||||
): | ||||||
filename_or_obj = _normalize_path(filename_or_obj) | ||||||
store = GDSZarrStore.open_group( | ||||||
filename_or_obj, | ||||||
group=group, | ||||||
mode=mode, | ||||||
synchronizer=synchronizer, | ||||||
consolidated=consolidated, | ||||||
consolidate_on_close=False, | ||||||
chunk_store=chunk_store, | ||||||
storage_options=storage_options, | ||||||
stacklevel=stacklevel + 1, | ||||||
) | ||||||
|
||||||
store_entrypoint = StoreBackendEntrypoint() | ||||||
with close_on_error(store): | ||||||
ds = store_entrypoint.open_dataset( | ||||||
store, | ||||||
mask_and_scale=mask_and_scale, | ||||||
decode_times=decode_times, | ||||||
concat_characters=concat_characters, | ||||||
decode_coords=decode_coords, | ||||||
drop_variables=drop_variables, | ||||||
use_cftime=use_cftime, | ||||||
decode_timedelta=decode_timedelta, | ||||||
) | ||||||
return ds |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import cupy as cp | ||
import numpy as np | ||
import pytest | ||
import xarray as xr | ||
from xarray.core.indexing import ExplicitlyIndexedNDArrayMixin | ||
|
||
kvikio = pytest.importorskip("kvikio") | ||
zarr = pytest.importorskip("zarr") | ||
|
||
import kvikio.zarr # noqa | ||
import xarray.core.indexing # noqa | ||
|
||
|
||
@pytest.fixture | ||
def store(tmp_path): | ||
ds = xr.Dataset( | ||
{ | ||
"a": ("x", np.arange(10), {"foo": "bar"}), | ||
"scalar": np.array(1), | ||
}, | ||
coords={"x": ("x", np.arange(-5, 5))}, | ||
) | ||
|
||
for var in ds.variables: | ||
ds[var].encoding["compressor"] = None | ||
|
||
store_path = tmp_path / "kvikio.zarr" | ||
ds.to_zarr(store_path, consolidated=True) | ||
return store_path | ||
|
||
|
||
def test_entrypoint(): | ||
assert "kvikio" in xr.backends.list_engines() | ||
|
||
|
||
@pytest.mark.parametrize("consolidated", [True, False]) | ||
def test_lazy_load(consolidated, store): | ||
with xr.open_dataset(store, engine="kvikio", consolidated=consolidated) as ds: | ||
for _, da in ds.data_vars.items(): | ||
assert isinstance(da.variable._data, ExplicitlyIndexedNDArrayMixin) | ||
|
||
|
||
@pytest.mark.parametrize("indexer", [slice(None), slice(2, 4), 2, [2, 3, 5]]) | ||
def test_lazy_indexing(indexer, store): | ||
with xr.open_dataset(store, engine="kvikio") as ds: | ||
ds = ds.isel(x=indexer) | ||
for _, da in ds.data_vars.items(): | ||
assert isinstance(da.variable._data, ExplicitlyIndexedNDArrayMixin) | ||
|
||
loaded = ds.compute() | ||
for _, da in loaded.data_vars.items(): | ||
if da.ndim == 0: | ||
continue | ||
assert isinstance(da.data, cp.ndarray) |
Uh oh!
There was an error while loading. Please reload this page.