Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7b126e5
silly enter in docstring, testing if I can push to fork
semvijverberg Jan 8, 2024
bcaf6af
dropna
Jan 8, 2024
7d77a6b
delete print statements
Jan 9, 2024
612835b
add docstrings
Jan 9, 2024
7862cce
Added test for wanted behavior with NaNS. Fixed floating point precis…
semvijverberg Jan 10, 2024
6dd695d
added tests for nans in timeseries, added dropna arg to fit_transform
semvijverberg Jan 10, 2024
8fecf80
subtract clim for test
semvijverberg Jan 10, 2024
d65f029
Fixed nan handling
semvijverberg Jan 10, 2024
5925358
Fix nan handling
semvijverberg Jan 10, 2024
8199ca8
polynomial detrending
Feb 5, 2024
38476ce
Revert "polynomial detrending"
Feb 5, 2024
796c3d0
add poly detrending
Feb 5, 2024
7d8ba54
adapt things lost in merge
Feb 5, 2024
b52ade9
remove unneeded comments
Feb 5, 2024
af6a48e
fix imports
Feb 5, 2024
e41b65c
remove unused line
Feb 5, 2024
a36b1af
edit nan return statement
Feb 5, 2024
ef39770
adding get_trend_timeseries and tests
semvijverberg Feb 5, 2024
def3069
hatch run format
semvijverberg Feb 5, 2024
9f07fb2
remnant type
semvijverberg Feb 5, 2024
5c1a750
Merge pull request #2 from Beyond-Weather-Git/poly_dropna2
semvijverberg Feb 5, 2024
748ec9f
minor fixes polyfit
semvijverberg Feb 5, 2024
abb8c20
added test for ds and da fit_transform
semvijverberg Feb 6, 2024
bb54c3f
Update
semvijverberg Feb 6, 2024
65116ba
fix transform with unequal date_range ordinal day
semvijverberg Feb 6, 2024
8d91388
Merge pull request #3 from Beyond-Weather-Git/minor_fixes_polyfit
semvijverberg Feb 12, 2024
1b0b2d4
Get trend timeseries of same coords as input data.
semvijverberg Feb 12, 2024
bb0189f
add [...] to transpose to allow for dimensions not present
semvijverberg Feb 12, 2024
4d03c22
added get_climatology_timeseries support
semvijverberg Feb 21, 2024
fdca588
oeps fix for different timescales
semvijverberg Feb 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ python_version = "3.9"

[tool.black]
line-length = 88
src_paths = ["s2spy", "tests"]
# src_paths = ["s2spy", "tests"]

[tool.ruff]
select = [
Expand Down Expand Up @@ -178,7 +178,7 @@ skip_glob = ["docs/*"]
force_single_line = true
lines_after_imports = 2
known_first_party = ["s2spy"]
src_paths = ["s2spy", "tests"]
# src_paths = ["s2spy", "tests"]
line_length = 120
no_lines_before = ["FUTURE","STDLIB","THIRDPARTY","FIRSTPARTY","LOCALFOLDER"]

Expand Down
1 change: 1 addition & 0 deletions s2spy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
This package is a high-level python package integrating expert knowledge
and artificial intelligence to boost (sub) seasonal forecasting.
"""

import logging
from .rgdr.rgdr import RGDR

Expand Down
253 changes: 238 additions & 15 deletions s2spy/preprocess.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Preprocessor for s2spy workflow."""

import warnings
from typing import Literal
from typing import Union
Expand All @@ -7,58 +8,186 @@
import xarray as xr


def _linregress(x: np.ndarray, y: np.ndarray) -> tuple[float, float]:
def _linregress(
x: np.ndarray,
y: np.ndarray,
nan_mask: Literal["individual", "complete"] = "individual",
) -> tuple[float, float]:
"""Calculate the slope and intercept between two arrays using scipy's linregress.

Used to make linregress more ufunc-friendly.


Args:
x: First array.
y: Second array.
nan_mask: How to handle NaN values. If 'complete', returns nan if x or y contains
1 or more NaN values. If 'individual', fit a trend by masking only the
indices with the NaNs.

Returns:
slope, intercept
"""
slope, intercept, _, _, _ = scipy.stats.linregress(x, y)
return slope, intercept


def _trend_linear(data: Union[xr.DataArray, xr.Dataset]) -> dict:
if nan_mask == "individual":
mask = np.logical_or(np.isnan(x), np.isnan(y))
if np.all(mask):
slope, intercept = np.nan, np.nan
elif np.any(mask):
slope, intercept, _, _, _ = scipy.stats.linregress(x[~mask], y[~mask])
else:
slope, intercept, _, _, _ = scipy.stats.linregress(x, y)
return slope, intercept
elif nan_mask == "complete":
if np.logical_or(np.isnan(x), np.isnan(y)).any():
slope, intercept = np.nan, np.nan # any NaNs in timeseries, return NaN.
slope, intercept, _, _, _ = scipy.stats.linregress(x, y)
return slope, intercept


def _trend_linear(
data: Union[xr.DataArray, xr.Dataset], nan_mask: str = "complete"
) -> dict:
"""Calculate the linear trend over time.

Args:
data: The input data of which you want to know the trend.
nan_mask: How to handle NaN values. If 'complete', returns nan if x or y contains
1 or more NaN values. If 'individual', fit a trend by masking only the
indices with the NaNs.

Returns:
Dictionary containing the linear trend information (slope and intercept)
"""
assert nan_mask in [
"complete",
"individual",
], "nan_mask should be 'complete' or 'individual'"
slope, intercept = xr.apply_ufunc(
_linregress,
data["time"].astype(float),
data,
input_core_dims=[["time"], ["time"]],
nan_mask,
input_core_dims=[["time"], ["time"], []],
output_core_dims=[[], []],
vectorize=True,
)
return {"slope": slope, "intercept": intercept}


def _get_lineartrend_timeseries(data: Union[xr.DataArray, xr.Dataset], trend: dict):
"""Calculate the linear trend timeseries from the trend dictionary."""
trend_timeseries = trend["intercept"] + trend["slope"] * (
data["time"].astype(float)
)
return trend_timeseries.transpose(*list(data.dims) + [...])


def _trend_poly(data: Union[xr.DataArray, xr.Dataset], degree: int = 2) -> dict:
"""Calculate the polynomial trend over time and return coefficients.

Args:
data: Input data.
degree: Degree of the polynomial for detrending.

Returns:
Dictionary containing polynomial trend coefficients.
"""
fixed_timestamp = np.datetime64("1900-01-01")
data.coords["ordinal_day"] = (
("time",),
(data.time - fixed_timestamp).values.astype("timedelta64[D]").astype(int),
)
coeffs = data.swap_dims({"time": "ordinal_day"}).polyfit(
"ordinal_day", deg=degree, skipna=True
)
return {"coefficients": coeffs}


def _get_polytrend_timeseries(data: Union[xr.DataArray, xr.Dataset], trend: dict):
"""Calculate the polynomial trend timeseries from the trend dictionary."""
fixed_timestamp = np.datetime64("1900-01-01")
polynomial_trend = (
xr.polyval(
data.assign_coords(
ordinal_day=(
"time",
(data.time - fixed_timestamp)
.values.astype("timedelta64[D]")
.astype(int),
)
).swap_dims({"time": "ordinal_day"})["ordinal_day"],
trend["coefficients"],
).swap_dims({"ordinal_day": "time"})
).drop_vars("ordinal_day")
# data = data # remove the ordinal_day coordinate
# rename f"{data_var}_polyfit_coeffiencts" to orginal data_var name
if isinstance(data, xr.Dataset):
da_names = list(data.data_vars)
rename = dict(
map(
lambda i, j: (i, j),
list(polynomial_trend.data_vars),
da_names,
)
)
polynomial_trend = polynomial_trend.rename(rename)
if isinstance(
data, xr.DataArray
): # keep consistent with input data and _get_lineartrend_timeseries
polynomial_trend = (
polynomial_trend.to_array().squeeze("variable").drop_vars("variable")
)
polynomial_trend.name = (
data.name if data.name is not None else "timeseries_polyfit"
)
return polynomial_trend.transpose(*list(data.dims) + [...])


def _subtract_linear_trend(data: Union[xr.DataArray, xr.Dataset], trend: dict):
"""Subtract a previously calclulated linear trend from (new) data."""
return data - trend["intercept"] - trend["slope"] * (data["time"].astype(float))
return data - _get_lineartrend_timeseries(data, trend)


def _subtract_polynomial_trend(
data: Union[xr.DataArray, xr.Dataset],
trend: dict,
):
"""Subtract a previously calculated polynomial trend from (new) data.

Args:
data: The data from which to subtract the trend (either an xarray DataArray or Dataset).
trend: A dictionary containing the polynomial trend coefficients.

Returns:
The data with the polynomial trend subtracted.
"""
# Subtract the polynomial trend from the data
return data - _get_polytrend_timeseries(data, trend)


def _get_trend(data: Union[xr.DataArray, xr.Dataset], method: str):
def _get_trend(
data: Union[xr.DataArray, xr.Dataset],
method: str,
nan_mask: str = "complete",
degree=2,
):
"""Calculate the trend, with a certain method. Only linear is implemented."""
if method == "linear":
return _trend_linear(data)
return _trend_linear(data, nan_mask)

if method == "polynomial":
if nan_mask != "complete":
raise ValueError("Polynomial currently only supports 'complete' nan_mask")
return _trend_poly(data, degree)
raise ValueError(f"Unkown detrending method '{method}'")


def _subtract_trend(data: Union[xr.DataArray, xr.Dataset], method: str, trend: dict):
"""Subtract the previously calculated trend from (new) data. Only linear is implemented."""
if method == "linear":
return _subtract_linear_trend(data, trend)
if method == "polynomial":
return _subtract_polynomial_trend(data, trend)
raise NotImplementedError


Expand All @@ -80,6 +209,20 @@ def _get_climatology(
return climatology


def climatology_to_timeseries(
group,
climatology,
timescale: Literal["monthly", "weekly", "daily"],
):
"""Convert climatology to timeseries."""
if timescale == "monthly":
return climatology.sel(month=group.time.dt.month).drop_vars("month")
elif timescale == "weekly":
return climatology.sel(week=group.time.dt.isocalendar().week).drop_vars("week")
elif timescale == "daily":
return climatology.sel(dayofyear=group.time.dt.dayofyear).drop_vars("dayofyear")


def _subtract_climatology(
data: Union[xr.Dataset, xr.DataArray],
timescale: Literal["monthly", "weekly", "daily"],
Expand Down Expand Up @@ -172,6 +315,7 @@ def __init__( # noqa: PLR0913
rolling_min_periods: int = 1,
subtract_climatology: bool = True,
detrend: Union[str, None] = "linear",
nan_mask: str = "complete",
):
"""Preprocessor for s2s data. Can detrend as well as deseasonalize.

Expand All @@ -194,14 +338,20 @@ def __init__( # noqa: PLR0913
and end of the preprocessed data.
subtract_climatology (optional): If you want to calculate and remove the
climatology of the data. Defaults to True.
detrend (optional): Which method to use for detrending. Currently the only method
supported is "linear". If you want to skip detrending, set this to None.
detrend (optional): Which method to use for detrending. Choose from "linear"
or "polynomial". Defaults to "linear". If you want to skip detrending,
set this to None.
timescale: Temporal resolution of input data.
nan_mask: How to handle NaN values. If 'complete', returns nan if x or y contains
1 or more NaN values. If 'individual', fit a trend by masking only the
indices with the NaNs.
"""
self._window_size = rolling_window_size
self._min_periods = rolling_min_periods
self._detrend = detrend
self._subtract_climatology = subtract_climatology
self._nan_mask = nan_mask

if subtract_climatology:
self._timescale = _check_temporal_resolution(timescale)

Expand All @@ -228,15 +378,14 @@ def fit(self, data: Union[xr.DataArray, xr.Dataset]) -> None:

if self._subtract_climatology:
self._climatology = _get_climatology(data_rolling, self._timescale)

if self._detrend is not None:
if self._subtract_climatology:
deseasonalized = _subtract_climatology(
data_rolling, self._timescale, self._climatology
)
self._trend = _get_trend(deseasonalized, self._detrend)
self._trend = _get_trend(deseasonalized, self._detrend, self._nan_mask)
else:
self._trend = _get_trend(data_rolling, self._detrend)
self._trend = _get_trend(data_rolling, self._detrend, self._nan_mask)

self._is_fit = True

Expand Down Expand Up @@ -281,6 +430,61 @@ def fit_transform(
self.fit(data)
return self.transform(data)

def get_trend_timeseries(self, data, align_coords: bool = False):
"""Get the trend timeseries from the data.

Args:
data (xr.DataArray or xr.Dataset): input data.
align_coords (bool): Construction of trend timeseries only uses the time
coordinate. Hence, your input to-be-transformed-array might be a subset
of the original array, but it will still return the original coordinates
(as stored in the trendline). With align_coords=True, the trend
timeseries will be aligned to the data passed to this function.
"""
if not self._is_fit:
raise ValueError(
"The preprocessor has to be fit to data before the trend"
" timeseries can be requested."
)
if self._detrend is None:
raise ValueError("Detrending is set to `None`, so no trend is available")
if align_coords:
trend = self.align_trend_coords(data)
else:
trend = self.trend
if self._detrend == "linear":
return _get_lineartrend_timeseries(data, trend)
elif self._detrend == "polynomial":
return _get_polytrend_timeseries(data, trend)
raise ValueError(f"Unkown detrending method '{self._detrend}'")

def get_climatology_timeseries(self, data, align_coords: bool = False):
"""Get the climatology timeseries from the data.

Args:
data (xr.DataArray or xr.Dataset): input data.
align_coords (bool): Construction of climatology timeseries only uses the time
coordinate. This aligns the coords to the data. See
`get_trend_timeseries` for more information.
"""
if not self._is_fit:
raise ValueError(
"The preprocessor has to be fit to data before the trend"
" timeseries can be requested."
)
if not self._subtract_climatology:
raise ValueError(
"`subtract_climatology is set to `False`, so no climatology "
"data is available"
)
if align_coords:
climatology = xr.align(self.climatology, data)[0]
else:
climatology = self.climatology
return data.groupby("time.year").map(
lambda x: climatology_to_timeseries(x, climatology, self._timescale)
)

@property
def trend(self) -> dict:
"""Return the stored trend (dictionary)."""
Expand All @@ -307,3 +511,22 @@ def climatology(self) -> Union[xr.DataArray, xr.Dataset]:
" climatology can be requested."
)
return self._climatology

def align_trend_coords(self, data):
"""Align coordinates between data and trend.

Args:
data (xr.DataArray or xr.Dataset): time, (lat), (lon) array.
trend ():
"""
if self._detrend == "linear":
align_trend = self.trend.copy()
align_trend["intercept"], align_trend["slope"] = xr.align(
*[align_trend["intercept"], align_trend["slope"], data]
)[:2]
elif self._detrend == "polynomial":
align_trend = self.trend.copy()
align_trend["coefficients"] = xr.align(align_trend["coefficients"], data)[0]
else:
raise ValueError(f"Unkown detrending method '{self._detrend}'")
return align_trend
1 change: 1 addition & 0 deletions s2spy/rgdr/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""Response Guided Dimensionality Reduction."""

from . import label_alignment # noqa: F401 (unused import)
1 change: 1 addition & 0 deletions s2spy/rgdr/label_alignment.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Label alignment tools for RGDR clusters."""

import itertools
import string
from copy import copy
Expand Down
Loading