-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add chunks='auto' support for cftime datasets #10527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 25 commits
eb1a967
852476d
c921c59
1aba531
9429c3d
3c9d27e
5153d2d
62e71e6
cfdc31b
2f16bc7
ce720fa
4fa58c1
e58d6d7
590e503
f953976
6706524
4e56acd
0d008cd
49c4e9c
4594099
5d00b0a
80421ef
d1f7ad3
1b7de62
4407185
d8f45b2
20226c1
11ac9f0
8485df5
2c27877
0983261
c4ec31f
adbf5b2
6c93bc4
74bc0ea
0b9bbd0
e58322f
dbc6ebd
5680663
b5933ed
5db9225
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -5,17 +5,21 @@ | |||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
import numpy as np | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
from xarray.core.common import _contains_cftime_datetimes | ||||||||||||||||||||||||||||
from xarray.core.indexing import ImplicitToExplicitIndexingAdapter | ||||||||||||||||||||||||||||
from xarray.namedarray.parallelcompat import ChunkManagerEntrypoint, T_ChunkedArray | ||||||||||||||||||||||||||||
from xarray.namedarray.utils import is_duck_dask_array, module_available | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if TYPE_CHECKING: | ||||||||||||||||||||||||||||
from xarray.core.variable import Variable | ||||||||||||||||||||||||||||
from xarray.namedarray._typing import ( | ||||||||||||||||||||||||||||
T_Chunks, | ||||||||||||||||||||||||||||
_DType, | ||||||||||||||||||||||||||||
_DType_co, | ||||||||||||||||||||||||||||
_NormalizedChunks, | ||||||||||||||||||||||||||||
duckarray, | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
from xarray.namedarray.parallelcompat import _Chunks | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||
from dask.array import Array as DaskArray | ||||||||||||||||||||||||||||
|
@@ -264,3 +268,63 @@ def shuffle( | |||||||||||||||||||||||||||
if chunks != "auto": | ||||||||||||||||||||||||||||
raise NotImplementedError("Only chunks='auto' is supported at present.") | ||||||||||||||||||||||||||||
return dask.array.shuffle(x, indexer, axis, chunks="auto") | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
def rechunk( # type: ignore[override] | ||||||||||||||||||||||||||||
self, | ||||||||||||||||||||||||||||
data: T_ChunkedArray, | ||||||||||||||||||||||||||||
chunks: _NormalizedChunks | tuple[int, ...] | _Chunks, | ||||||||||||||||||||||||||||
**kwargs: Any, | ||||||||||||||||||||||||||||
) -> Any: | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
Changes the chunking pattern of the given array. | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
Called when the .chunk method is called on an xarray object that is already chunked. | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
Parameters | ||||||||||||||||||||||||||||
---------- | ||||||||||||||||||||||||||||
data : dask array | ||||||||||||||||||||||||||||
Array to be rechunked. | ||||||||||||||||||||||||||||
chunks : int, tuple, dict or str, optional | ||||||||||||||||||||||||||||
The new block dimensions to create. -1 indicates the full size of the | ||||||||||||||||||||||||||||
corresponding dimension. Default is "auto" which automatically | ||||||||||||||||||||||||||||
determines chunk sizes. | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
Returns | ||||||||||||||||||||||||||||
------- | ||||||||||||||||||||||||||||
chunked array | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
See Also | ||||||||||||||||||||||||||||
-------- | ||||||||||||||||||||||||||||
dask.array.Array.rechunk | ||||||||||||||||||||||||||||
cubed.Array.rechunk | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if _contains_cftime_datetimes(data): | ||||||||||||||||||||||||||||
from dask import config as dask_config | ||||||||||||||||||||||||||||
from dask.array.core import normalize_chunks | ||||||||||||||||||||||||||||
from dask.utils import parse_bytes | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
from xarray.namedarray.utils import fake_target_chunksize | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
target_chunksize = parse_bytes(dask_config.get("array.chunk-size")) | ||||||||||||||||||||||||||||
limit, var_dtype = fake_target_chunksize( # type: ignore[var-annotated] | ||||||||||||||||||||||||||||
data, target_chunksize=target_chunksize | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
chunks = normalize_chunks( | ||||||||||||||||||||||||||||
chunks, | ||||||||||||||||||||||||||||
shape=data.shape, # type: ignore[attr-defined] | ||||||||||||||||||||||||||||
dtype=var_dtype, | ||||||||||||||||||||||||||||
limit=limit, | ||||||||||||||||||||||||||||
) # type: ignore[no-untyped-call] | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
return data.rechunk(chunks, **kwargs) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
def get_auto_chunk_size(self, var: Variable) -> tuple[int, _DType]: | ||||||||||||||||||||||||||||
from dask import config as dask_config | ||||||||||||||||||||||||||||
from dask.utils import parse_bytes | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
from xarray.namedarray.utils import fake_target_chunksize | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
target_chunksize = parse_bytes(dask_config.get("array.chunk-size")) | ||||||||||||||||||||||||||||
return fake_target_chunksize(var, target_chunksize=target_chunksize) | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Only this much is dask-specific, so that's what the DaskManager should be responsible for. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
from __future__ import annotations | ||
|
||
import importlib | ||
import sys | ||
import warnings | ||
from collections.abc import Hashable, Iterable, Iterator, Mapping | ||
from functools import lru_cache | ||
|
@@ -23,7 +24,9 @@ | |
DaskArray = NDArray # type: ignore[assignment, misc] | ||
DaskCollection: Any = NDArray # type: ignore[no-redef] | ||
|
||
from xarray.namedarray._typing import _Dim, duckarray | ||
from xarray.core.variable import Variable | ||
from xarray.namedarray._typing import DuckArray, _Dim, _DType, duckarray | ||
from xarray.namedarray.parallelcompat import T_ChunkedArray | ||
|
||
|
||
K = TypeVar("K") | ||
|
@@ -195,6 +198,37 @@ def either_dict_or_kwargs( | |
return pos_kwargs | ||
|
||
|
||
def fake_target_chunksize( | ||
data: DuckArray[Any] | T_ChunkedArray | Variable, | ||
target_chunksize: int, | ||
) -> tuple[int, _DType]: | ||
""" | ||
Naughty trick - let's get the ratio of our cftime_nbytes, and then compute | ||
the ratio of that size to a np.float64. Then we can just adjust our target_chunksize | ||
and use the default dask chunking algorithm to get a reasonable chunk size. | ||
|
||
? I don't think T_chunkedArray or Variable should be necessary, but the calls | ||
? to this in daskmanager.py requires it to be that. I still need to wrap my head | ||
? around the typing here a bit more. | ||
dcherian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
import numpy as np | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's move imports to the top if we can; and remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can only move numpy to the top - moving There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've removed the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you mean? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looked this up subsequently and I think I'm talking waffle - the Since arrays can only contain a single dtype, this shouldn't make any difference. TLDR; ignore my previous comment, it was nonsense |
||
|
||
from xarray.core.formatting import first_n_items | ||
|
||
output_dtype: _DType = np.dtype(np.float64) # type: ignore[assignment] | ||
|
||
if data.dtype == object: | ||
nbytes_approx: int = sys.getsizeof(first_n_items(data, 1)) # type: ignore[no-untyped-call] | ||
else: | ||
nbytes_approx = data[0].itemsize | ||
|
||
f64_nbytes = output_dtype.itemsize # Should be 8 bytes | ||
|
||
target_chunksize = int(target_chunksize * (f64_nbytes / nbytes_approx)) | ||
|
||
return target_chunksize, output_dtype | ||
|
||
|
||
class ReprObject: | ||
"""Object that prints as the given value, for use with sentinel values.""" | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,8 @@ | |
from typing import TYPE_CHECKING, Any, Literal, TypeVar, Union, overload | ||
|
||
from xarray.core import utils | ||
from xarray.core.utils import emit_user_level_warning | ||
from xarray.core.common import _contains_cftime_datetimes | ||
from xarray.core.utils import emit_user_level_warning, is_dict_like | ||
from xarray.core.variable import IndexVariable, Variable | ||
from xarray.namedarray.parallelcompat import ( | ||
ChunkManagerEntrypoint, | ||
|
@@ -83,8 +84,23 @@ def _get_chunk(var: Variable, chunks, chunkmanager: ChunkManagerEntrypoint): | |
for dim, preferred_chunk_sizes in zip(dims, preferred_chunk_shape, strict=True) | ||
) | ||
|
||
# Chunks can be either dict-like or tuple-like (according to type annotations) | ||
# at this point, so check for # this before we manually construct our chunk | ||
# spec- if we've set chunks to auto | ||
_chunks = list(chunks.values()) if is_dict_like(chunks) else chunks | ||
auto_chunks = all(_chunk == "auto" for _chunk in _chunks) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think technically a subset of this tuple can be "auto" but we can ignore this wrinkle for now. |
||
|
||
if _contains_cftime_datetimes(var) and auto_chunks: | ||
limit, var_dtype = chunkmanager.get_auto_chunk_size(var) | ||
else: | ||
limit, var_dtype = None, var.dtype | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic would change to use |
||
|
||
chunk_shape = chunkmanager.normalize_chunks( | ||
chunk_shape, shape=shape, dtype=var.dtype, previous_chunks=preferred_chunk_shape | ||
chunk_shape, | ||
shape=shape, | ||
dtype=var_dtype, | ||
limit=limit, | ||
previous_chunks=preferred_chunk_shape, | ||
) | ||
|
||
# Warn where requested chunks break preferred chunks, provided that the variable | ||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -5427,6 +5427,35 @@ def test_open_multi_dataset(self) -> None: | |||||||||
) as actual: | ||||||||||
assert_identical(expected, actual) | ||||||||||
|
||||||||||
def test_open_dataset_cftime_autochunk(self) -> None: | ||||||||||
dcherian marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
"""Create a dataset with cftime datetime objects and | ||||||||||
ensure that auto-chunking works correctly.""" | ||||||||||
import cftime | ||||||||||
|
||||||||||
from xarray.core.common import _contains_cftime_datetimes | ||||||||||
dcherian marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
||||||||||
original = xr.Dataset( | ||||||||||
{ | ||||||||||
"foo": ("time", [0.0]), | ||||||||||
"time_bnds": ( | ||||||||||
("time", "bnds"), | ||||||||||
[ | ||||||||||
[ | ||||||||||
cftime.Datetime360Day(2005, 12, 1, 0, 0, 0, 0), | ||||||||||
cftime.Datetime360Day(2005, 12, 2, 0, 0, 0, 0), | ||||||||||
] | ||||||||||
], | ||||||||||
), | ||||||||||
}, | ||||||||||
{"time": [cftime.Datetime360Day(2005, 12, 1, 12, 0, 0, 0)]}, | ||||||||||
) | ||||||||||
with create_tmp_file() as tmp: | ||||||||||
original.to_netcdf(tmp) | ||||||||||
with open_dataset(tmp, chunks="auto") as actual: | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
assert isinstance(actual.time_bnds.variable.data, da.Array) | ||||||||||
assert _contains_cftime_datetimes(actual.time) | ||||||||||
assert_identical(original, actual) | ||||||||||
|
||||||||||
# Flaky test. Very open to contributions on fixing this | ||||||||||
@pytest.mark.flaky | ||||||||||
def test_dask_roundtrip(self) -> None: | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this can be deleted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a play and I don't think I can fully get rid of it, I've reused as much of the abstracted logic as possible though.