-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add chunks='auto' support for cftime datasets #10527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 12 commits
eb1a967
852476d
c921c59
1aba531
9429c3d
3c9d27e
5153d2d
62e71e6
cfdc31b
2f16bc7
ce720fa
4fa58c1
e58d6d7
590e503
f953976
6706524
4e56acd
0d008cd
49c4e9c
4594099
5d00b0a
80421ef
d1f7ad3
1b7de62
4407185
d8f45b2
20226c1
11ac9f0
8485df5
2c27877
0983261
c4ec31f
adbf5b2
6c93bc4
74bc0ea
0b9bbd0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
from __future__ import annotations | ||
|
||
import importlib | ||
import sys | ||
import warnings | ||
from collections.abc import Hashable, Iterable, Iterator, Mapping | ||
from functools import lru_cache | ||
|
@@ -16,6 +17,8 @@ | |
|
||
from numpy.typing import NDArray | ||
|
||
from xarray.namedarray.parallelcompat import T_ChunkedArray | ||
|
||
try: | ||
from dask.array.core import Array as DaskArray | ||
from dask.typing import DaskCollection | ||
|
@@ -195,6 +198,30 @@ def either_dict_or_kwargs( | |
return pos_kwargs | ||
|
||
|
||
def build_chunkspec( | ||
data: T_ChunkedArray, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be "duck array" |
||
target_chunksize: int, | ||
) -> tuple[int, ...]: | ||
""" | ||
Try to make chunks roughly cubic. This needs to be a bit smarter, it | ||
really ought to account for xr.structure.chunks._getchunk and try to | ||
use the default encoding to set the chunk size. | ||
""" | ||
from xarray.core.formatting import first_n_items | ||
|
||
cftime_nbytes_approx: int = sys.getsizeof(first_n_items(data, 1)) # type: ignore[no-untyped-call] | ||
elements_per_chunk = target_chunksize // cftime_nbytes_approx | ||
ndim = data.ndim # type:ignore[attr-defined] | ||
shape = data.shape # type:ignore[attr-defined] | ||
if ndim > 0: | ||
chunk_size_per_dim = int(elements_per_chunk ** (1.0 / ndim)) | ||
chunks = tuple(min(chunk_size_per_dim, dim_size) for dim_size in shape) | ||
else: | ||
chunks = () | ||
|
||
return chunks | ||
|
||
|
||
class ReprObject: | ||
"""Object that prints as the given value, for use with sentinel values.""" | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -11,6 +11,7 @@ | |||||||||||||||||||||||||||||||||
from typing import TYPE_CHECKING, Any, Literal, TypeVar, Union, overload | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
from xarray.core import utils | ||||||||||||||||||||||||||||||||||
from xarray.core.common import _contains_cftime_datetimes | ||||||||||||||||||||||||||||||||||
from xarray.core.utils import emit_user_level_warning | ||||||||||||||||||||||||||||||||||
from xarray.core.variable import IndexVariable, Variable | ||||||||||||||||||||||||||||||||||
from xarray.namedarray.parallelcompat import ( | ||||||||||||||||||||||||||||||||||
|
@@ -83,9 +84,27 @@ def _get_chunk(var: Variable, chunks, chunkmanager: ChunkManagerEntrypoint): | |||||||||||||||||||||||||||||||||
for dim, preferred_chunk_sizes in zip(dims, preferred_chunk_shape, strict=True) | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
chunk_shape = chunkmanager.normalize_chunks( | ||||||||||||||||||||||||||||||||||
chunk_shape, shape=shape, dtype=var.dtype, previous_chunks=preferred_chunk_shape | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
if _contains_cftime_datetimes(var): | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||
# If we have cftime datetimes, need to preprocess them - we can't pass | ||||||||||||||||||||||||||||||||||
# an object dtype into DaskManager.normalize_chunks. | ||||||||||||||||||||||||||||||||||
from dask import config as dask_config | ||||||||||||||||||||||||||||||||||
from dask.utils import parse_bytes | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
from xarray.namedarray.utils import build_chunkspec | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
target_chunksize = parse_bytes(dask_config.get("array.chunk-size")) | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about adding cc @TomNicholas |
||||||||||||||||||||||||||||||||||
chunk_shape = build_chunkspec(var, target_chunksize=target_chunksize) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
chunk_shape = chunkmanager.normalize_chunks( | ||||||||||||||||||||||||||||||||||
chunk_shape, shape=shape, previous_chunks=preferred_chunk_shape | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||
chunk_shape = chunkmanager.normalize_chunks( | ||||||||||||||||||||||||||||||||||
chunk_shape, | ||||||||||||||||||||||||||||||||||
shape=shape, | ||||||||||||||||||||||||||||||||||
dtype=var.dtype, | ||||||||||||||||||||||||||||||||||
previous_chunks=preferred_chunk_shape, | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no if _contains_cftime_datetimes(var):
...
chunk_shape = build_chunkspec(...)
var_dtype = None
else:
var_dtype = var.dtype
chunk_shape = chunkmanager.normalize_chunks(
chunk_shape,
shape=shape,
dtype=var_dtype,
previous_chunks=preferred_chunk_shape,
) which seems cleaner than what I've currently got? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ignore that, I've changed how this works to allow us to use the dask native chunk normalization - by computing a ratio of sizes to a |
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
# Warn where requested chunks break preferred chunks, provided that the variable | ||||||||||||||||||||||||||||||||||
# contains data. | ||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this can be deleted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a play and I don't think I can fully get rid of it, I've reused as much of the abstracted logic as possible though.