Skip to content

Commit 16b6a24

Browse files
authored
Merge pull request #11 from CU-ESIIL/codex/add-pipe-system-and-ops-layout
Add pipe helper and ops verbs
2 parents 7c6e141 + ab0c206 commit 16b6a24

File tree

9 files changed

+281
-1
lines changed

9 files changed

+281
-1
lines changed

README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,21 @@ var_cube = cd.variance_cube(cube)
5252
var_cube.to_netcdf("gridmet_variance.nc")
5353
```
5454

55+
Pipe ggplot-style operations with ``|`` for quick cube math:
56+
57+
```python
58+
import cubedynamics as cd
59+
60+
cube = ... # any xarray DataArray or Dataset
61+
62+
result = (
63+
cd.pipe(cube)
64+
| cd.anomaly(dim="time")
65+
| cd.month_filter([6, 7, 8])
66+
| cd.variance(dim="time")
67+
).unwrap()
68+
```
69+
5570
Additional helpers can build NDVI z-score cubes, compute rolling correlation vs
5671
an anchor pixel, or export “lexcubes” for downstream dashboards. Follow the docs
5772
for more end-to-end examples while the streaming implementations are finalized.

docs/getting_started.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,38 @@ print(cube)
5959

6060
If the call succeeds you are ready to work through the [Concepts](concepts.md)
6161
and [API & Examples](climate_cubes.md) sections.
62+
63+
## Pipe syntax (ggplot-style verbs)
64+
65+
CubeDynamics now exposes a ``pipe()`` helper and pipeable verbs under
66+
``cubedynamics.ops``. Wrap any xarray ``DataArray`` or ``Dataset`` with
67+
``cd.pipe()`` and compose operations with the ``|`` operator:
68+
69+
```python
70+
import cubedynamics as cd
71+
72+
cube = ...
73+
74+
result = (
75+
cd.pipe(cube)
76+
| cd.anomaly(dim="time")
77+
| cd.month_filter([6, 7, 8])
78+
| cd.variance(dim="time")
79+
| cd.to_netcdf("out.nc")
80+
).unwrap()
81+
```
82+
83+
Pipeable verbs are factories. You can create your own by following the same
84+
pattern:
85+
86+
```python
87+
def my_custom_op(scale):
88+
def _inner(cube):
89+
return cube * scale
90+
return _inner
91+
92+
result = cd.pipe(cube) | my_custom_op(0.5)
93+
```
94+
95+
This keeps the streaming-first philosophy: each verb simply transforms or
96+
reduces the cube provided by the pipe without forcing eager downloads.

src/cubedynamics/__init__.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"""
99

1010
from .version import __version__
11+
from .piping import Pipe, pipe
1112

1213
# Legacy, fully implemented APIs -------------------------------------------------
1314
from .data.gridmet import load_gridmet_cube
@@ -30,7 +31,8 @@
3031
# Streaming-first stubs for the new architecture ---------------------------------
3132
from .gridmet_streaming import stream_gridmet_to_cube
3233
from .prism_streaming import stream_prism_to_cube
33-
from .correlation_cubes import correlation_cube
34+
from .correlation_cubes import correlation_cube as streaming_correlation_cube
35+
from .ops import anomaly, month_filter, variance, correlation_cube, to_netcdf
3436

3537
__all__ = [
3638
"__version__",
@@ -54,5 +56,12 @@
5456
# Streaming-first stubs -------------------------------------------------------
5557
"stream_gridmet_to_cube",
5658
"stream_prism_to_cube",
59+
"Pipe",
60+
"pipe",
61+
"anomaly",
62+
"month_filter",
63+
"variance",
5764
"correlation_cube",
65+
"to_netcdf",
66+
"streaming_correlation_cube",
5867
]

src/cubedynamics/ops/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
"""Pipeable cube operations."""
2+
3+
from .transforms import anomaly, month_filter
4+
from .stats import variance, correlation_cube
5+
from .io import to_netcdf
6+
7+
__all__ = [
8+
"anomaly",
9+
"month_filter",
10+
"variance",
11+
"correlation_cube",
12+
"to_netcdf",
13+
]

src/cubedynamics/ops/io.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
"""I/O helpers for pipe chains."""
2+
3+
from __future__ import annotations
4+
5+
import xarray as xr
6+
7+
8+
def to_netcdf(path: str, **to_netcdf_kwargs):
9+
"""Factory for a pipeable ``.to_netcdf`` side-effect operation."""
10+
11+
def _inner(da: xr.DataArray | xr.Dataset):
12+
da.to_netcdf(path, **to_netcdf_kwargs)
13+
return da
14+
15+
return _inner
16+
17+
18+
__all__ = ["to_netcdf"]

src/cubedynamics/ops/stats.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""Statistical pipeable operations."""
2+
3+
from __future__ import annotations
4+
5+
import xarray as xr
6+
7+
8+
def variance(dim: str = "time"):
9+
"""Factory returning a variance reducer over ``dim`` for pipe chains."""
10+
11+
def _inner(da: xr.DataArray | xr.Dataset):
12+
if dim not in da.dims:
13+
raise ValueError(f"Dimension {dim!r} not found in object dims: {da.dims}")
14+
return da.var(dim=dim)
15+
16+
return _inner
17+
18+
19+
def correlation_cube(other: xr.DataArray | xr.Dataset | None, dim: str = "time"):
20+
"""Factory placeholder for a future correlation cube operation.
21+
22+
Parameters
23+
----------
24+
other:
25+
The comparison cube captured by the factory.
26+
dim:
27+
Dimension over which correlations would be computed once implemented.
28+
"""
29+
30+
if other is None or not isinstance(dim, str):
31+
raise NotImplementedError("correlation_cube is not implemented yet.")
32+
33+
def _inner(da: xr.DataArray | xr.Dataset): # pragma: no cover - stub
34+
raise NotImplementedError("correlation_cube is not implemented yet.")
35+
36+
return _inner
37+
38+
39+
__all__ = ["variance", "correlation_cube"]

src/cubedynamics/ops/transforms.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
"""Transform-style pipeable operations."""
2+
3+
from __future__ import annotations
4+
5+
from collections.abc import Iterable
6+
7+
import xarray as xr
8+
9+
10+
def anomaly(dim: str = "time"):
11+
"""Factory for an anomaly transform over ``dim``.
12+
13+
Returns a callable that subtracts the mean over ``dim`` while preserving the
14+
input type (``DataArray`` or ``Dataset``). The returned callable is intended
15+
to be used within a :class:`~cubedynamics.piping.Pipe` chain via ``|``.
16+
"""
17+
18+
def _inner(da: xr.DataArray | xr.Dataset):
19+
if dim not in da.dims:
20+
raise ValueError(f"Dimension {dim!r} not found in object dims: {da.dims}")
21+
return da - da.mean(dim=dim)
22+
23+
return _inner
24+
25+
26+
def month_filter(months: Iterable[int]):
27+
"""Factory for filtering calendar months from a ``time`` coordinate.
28+
29+
Parameters
30+
----------
31+
months:
32+
Sequence of integers (1-12) to keep. Requires that the object has a
33+
``time`` coordinate with datetime-like values.
34+
"""
35+
36+
months = tuple(int(m) for m in months)
37+
38+
def _inner(da: xr.DataArray | xr.Dataset):
39+
if "time" not in da.coords:
40+
raise ValueError("month_filter requires a 'time' coordinate.")
41+
time = da["time"]
42+
try:
43+
month_vals = time.dt.month
44+
except Exception as exc: # pragma: no cover - dt errors raised as ValueError below
45+
raise ValueError("month_filter: 'time' coordinate must be datetime-like.") from exc
46+
mask = month_vals.isin(months)
47+
return da.where(mask, drop=True)
48+
49+
return _inner
50+
51+
52+
__all__ = ["anomaly", "month_filter"]

src/cubedynamics/piping.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""Pipe wrapper enabling ``|`` composition for cube operations."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Callable, Generic, TypeVar
6+
7+
8+
T = TypeVar("T")
9+
10+
11+
U = TypeVar("U")
12+
13+
14+
class Pipe(Generic[T]):
15+
"""Wrap a value so it can flow through ``|`` pipe stages."""
16+
17+
def __init__(self, value: T) -> None:
18+
self.value = value
19+
20+
def __or__(self, func: Callable[[T], U]) -> "Pipe[U]":
21+
"""Apply ``func`` to the wrapped value and return a new :class:`Pipe`."""
22+
23+
new_value = func(self.value)
24+
return Pipe(new_value)
25+
26+
def unwrap(self) -> T:
27+
"""Return the wrapped value, ending the pipe chain."""
28+
29+
return self.value
30+
31+
@property
32+
def v(self) -> T:
33+
"""Convenience alias for :pyattr:`value`."""
34+
35+
return self.value
36+
37+
38+
def pipe(value: T) -> Pipe[T]:
39+
"""Wrap ``value`` in a :class:`Pipe`, enabling ``Pipe | op(...)`` syntax.
40+
41+
Example
42+
-------
43+
>>> result = (pipe(1) | (lambda x: x + 2)).unwrap()
44+
>>> assert result == 3
45+
"""
46+
47+
return Pipe(value)
48+
49+
50+
__all__ = ["Pipe", "pipe"]
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""Tests for the ggplot-style pipe syntax and verbs."""
2+
3+
from __future__ import annotations
4+
5+
import numpy as np
6+
import pandas as pd
7+
import xarray as xr
8+
9+
import cubedynamics as cd
10+
11+
12+
def _make_time_series(count: int = 12):
13+
time = pd.date_range("2000-01-01", periods=count, freq="MS")
14+
data = xr.DataArray(
15+
np.arange(count, dtype=float),
16+
dims=("time",),
17+
coords={"time": time},
18+
)
19+
return data
20+
21+
22+
def test_pipe_basic_chain():
23+
da = _make_time_series()
24+
25+
result = (cd.pipe(da) | cd.anomaly(dim="time") | cd.variance(dim="time")).unwrap()
26+
27+
assert isinstance(result, xr.DataArray)
28+
assert result.dims == ()
29+
assert float(result) >= 0
30+
31+
32+
def test_month_filter_reduces_time():
33+
da = _make_time_series(24)
34+
35+
summer = (cd.pipe(da) | cd.month_filter([6, 7, 8])).unwrap()
36+
37+
assert set(int(m) for m in summer["time"].dt.month.values) == {6, 7, 8}
38+
39+
40+
def test_to_netcdf_roundtrip(tmp_path):
41+
da = _make_time_series()
42+
path = tmp_path / "out.nc"
43+
44+
result = (cd.pipe(da) | cd.to_netcdf(path)).unwrap()
45+
46+
assert path.exists()
47+
loaded = xr.load_dataarray(path)
48+
xr.testing.assert_identical(da, loaded)
49+
xr.testing.assert_identical(da, result)

0 commit comments

Comments
 (0)