diff --git a/pyproject.toml b/pyproject.toml index f7c234b..996fec7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -120,7 +120,7 @@ python_version = "3.9" [tool.black] line-length = 88 -src_paths = ["s2spy", "tests"] +# src_paths = ["s2spy", "tests"] [tool.ruff] select = [ @@ -178,7 +178,7 @@ skip_glob = ["docs/*"] force_single_line = true lines_after_imports = 2 known_first_party = ["s2spy"] -src_paths = ["s2spy", "tests"] +# src_paths = ["s2spy", "tests"] line_length = 120 no_lines_before = ["FUTURE","STDLIB","THIRDPARTY","FIRSTPARTY","LOCALFOLDER"] diff --git a/s2spy/__init__.py b/s2spy/__init__.py index 1654eae..b829b65 100644 --- a/s2spy/__init__.py +++ b/s2spy/__init__.py @@ -3,6 +3,7 @@ This package is a high-level python package integrating expert knowledge and artificial intelligence to boost (sub) seasonal forecasting. """ + import logging from .rgdr.rgdr import RGDR diff --git a/s2spy/preprocess.py b/s2spy/preprocess.py index 0f01d97..afddea0 100644 --- a/s2spy/preprocess.py +++ b/s2spy/preprocess.py @@ -1,4 +1,5 @@ """Preprocessor for s2spy workflow.""" + import warnings from typing import Literal from typing import Union @@ -7,51 +8,177 @@ import xarray as xr -def _linregress(x: np.ndarray, y: np.ndarray) -> tuple[float, float]: +def _linregress( + x: np.ndarray, + y: np.ndarray, + nan_mask: Literal["individual", "complete"] = "individual", +) -> tuple[float, float]: """Calculate the slope and intercept between two arrays using scipy's linregress. Used to make linregress more ufunc-friendly. + Args: x: First array. y: Second array. + nan_mask: How to handle NaN values. If 'complete', returns nan if x or y contains + 1 or more NaN values. If 'individual', fit a trend by masking only the + indices with the NaNs. Returns: slope, intercept """ - slope, intercept, _, _, _ = scipy.stats.linregress(x, y) - return slope, intercept - - -def _trend_linear(data: Union[xr.DataArray, xr.Dataset]) -> dict: + if nan_mask == "individual": + mask = np.logical_or(np.isnan(x), np.isnan(y)) + if np.all(mask): + slope, intercept = np.nan, np.nan + elif np.any(mask): + slope, intercept, _, _, _ = scipy.stats.linregress(x[~mask], y[~mask]) + else: + slope, intercept, _, _, _ = scipy.stats.linregress(x, y) + return slope, intercept + elif nan_mask == "complete": + if np.logical_or(np.isnan(x), np.isnan(y)).any(): + slope, intercept = np.nan, np.nan # any NaNs in timeseries, return NaN. + slope, intercept, _, _, _ = scipy.stats.linregress(x, y) + return slope, intercept + + +def _trend_linear( + data: Union[xr.DataArray, xr.Dataset], nan_mask: str = "complete" +) -> dict: """Calculate the linear trend over time. Args: data: The input data of which you want to know the trend. + nan_mask: How to handle NaN values. If 'complete', returns nan if x or y contains + 1 or more NaN values. If 'individual', fit a trend by masking only the + indices with the NaNs. Returns: Dictionary containing the linear trend information (slope and intercept) """ + assert nan_mask in [ + "complete", + "individual", + ], "nan_mask should be 'complete' or 'individual'" slope, intercept = xr.apply_ufunc( _linregress, data["time"].astype(float), data, - input_core_dims=[["time"], ["time"]], + nan_mask, + input_core_dims=[["time"], ["time"], []], output_core_dims=[[], []], vectorize=True, ) return {"slope": slope, "intercept": intercept} +def _get_lineartrend_timeseries(data: Union[xr.DataArray, xr.Dataset], trend: dict): + """Calculate the linear trend timeseries from the trend dictionary.""" + trend_timeseries = trend["intercept"] + trend["slope"] * ( + data["time"].astype(float) + ) + return trend_timeseries.transpose(*list(data.dims) + [...]) + + +def _trend_poly(data: Union[xr.DataArray, xr.Dataset], degree: int = 2) -> dict: + """Calculate the polynomial trend over time and return coefficients. + + Args: + data: Input data. + degree: Degree of the polynomial for detrending. + + Returns: + Dictionary containing polynomial trend coefficients. + """ + fixed_timestamp = np.datetime64("1900-01-01") + data.coords["ordinal_day"] = ( + ("time",), + (data.time - fixed_timestamp).values.astype("timedelta64[D]").astype(int), + ) + coeffs = data.swap_dims({"time": "ordinal_day"}).polyfit( + "ordinal_day", deg=degree, skipna=True + ) + return {"coefficients": coeffs} + + +def _get_polytrend_timeseries(data: Union[xr.DataArray, xr.Dataset], trend: dict): + """Calculate the polynomial trend timeseries from the trend dictionary.""" + fixed_timestamp = np.datetime64("1900-01-01") + polynomial_trend = ( + xr.polyval( + data.assign_coords( + ordinal_day=( + "time", + (data.time - fixed_timestamp) + .values.astype("timedelta64[D]") + .astype(int), + ) + ).swap_dims({"time": "ordinal_day"})["ordinal_day"], + trend["coefficients"], + ).swap_dims({"ordinal_day": "time"}) + ).drop_vars("ordinal_day") + # data = data # remove the ordinal_day coordinate + # rename f"{data_var}_polyfit_coeffiencts" to orginal data_var name + if isinstance(data, xr.Dataset): + da_names = list(data.data_vars) + rename = dict( + map( + lambda i, j: (i, j), + list(polynomial_trend.data_vars), + da_names, + ) + ) + polynomial_trend = polynomial_trend.rename(rename) + if isinstance( + data, xr.DataArray + ): # keep consistent with input data and _get_lineartrend_timeseries + polynomial_trend = ( + polynomial_trend.to_array().squeeze("variable").drop_vars("variable") + ) + polynomial_trend.name = ( + data.name if data.name is not None else "timeseries_polyfit" + ) + return polynomial_trend.transpose(*list(data.dims) + [...]) + + def _subtract_linear_trend(data: Union[xr.DataArray, xr.Dataset], trend: dict): """Subtract a previously calclulated linear trend from (new) data.""" - return data - trend["intercept"] - trend["slope"] * (data["time"].astype(float)) + return data - _get_lineartrend_timeseries(data, trend) + + +def _subtract_polynomial_trend( + data: Union[xr.DataArray, xr.Dataset], + trend: dict, +): + """Subtract a previously calculated polynomial trend from (new) data. + + Args: + data: The data from which to subtract the trend (either an xarray DataArray or Dataset). + trend: A dictionary containing the polynomial trend coefficients. + + Returns: + The data with the polynomial trend subtracted. + """ + # Subtract the polynomial trend from the data + return data - _get_polytrend_timeseries(data, trend) -def _get_trend(data: Union[xr.DataArray, xr.Dataset], method: str): +def _get_trend( + data: Union[xr.DataArray, xr.Dataset], + method: str, + nan_mask: str = "complete", + degree=2, +): """Calculate the trend, with a certain method. Only linear is implemented.""" if method == "linear": - return _trend_linear(data) + return _trend_linear(data, nan_mask) + + if method == "polynomial": + if nan_mask != "complete": + raise ValueError("Polynomial currently only supports 'complete' nan_mask") + return _trend_poly(data, degree) raise ValueError(f"Unkown detrending method '{method}'") @@ -59,6 +186,8 @@ def _subtract_trend(data: Union[xr.DataArray, xr.Dataset], method: str, trend: d """Subtract the previously calculated trend from (new) data. Only linear is implemented.""" if method == "linear": return _subtract_linear_trend(data, trend) + if method == "polynomial": + return _subtract_polynomial_trend(data, trend) raise NotImplementedError @@ -80,6 +209,20 @@ def _get_climatology( return climatology +def climatology_to_timeseries( + group, + climatology, + timescale: Literal["monthly", "weekly", "daily"], +): + """Convert climatology to timeseries.""" + if timescale == "monthly": + return climatology.sel(month=group.time.dt.month).drop_vars("month") + elif timescale == "weekly": + return climatology.sel(week=group.time.dt.isocalendar().week).drop_vars("week") + elif timescale == "daily": + return climatology.sel(dayofyear=group.time.dt.dayofyear).drop_vars("dayofyear") + + def _subtract_climatology( data: Union[xr.Dataset, xr.DataArray], timescale: Literal["monthly", "weekly", "daily"], @@ -172,6 +315,7 @@ def __init__( # noqa: PLR0913 rolling_min_periods: int = 1, subtract_climatology: bool = True, detrend: Union[str, None] = "linear", + nan_mask: str = "complete", ): """Preprocessor for s2s data. Can detrend as well as deseasonalize. @@ -194,14 +338,20 @@ def __init__( # noqa: PLR0913 and end of the preprocessed data. subtract_climatology (optional): If you want to calculate and remove the climatology of the data. Defaults to True. - detrend (optional): Which method to use for detrending. Currently the only method - supported is "linear". If you want to skip detrending, set this to None. + detrend (optional): Which method to use for detrending. Choose from "linear" + or "polynomial". Defaults to "linear". If you want to skip detrending, + set this to None. timescale: Temporal resolution of input data. + nan_mask: How to handle NaN values. If 'complete', returns nan if x or y contains + 1 or more NaN values. If 'individual', fit a trend by masking only the + indices with the NaNs. """ self._window_size = rolling_window_size self._min_periods = rolling_min_periods self._detrend = detrend self._subtract_climatology = subtract_climatology + self._nan_mask = nan_mask + if subtract_climatology: self._timescale = _check_temporal_resolution(timescale) @@ -228,15 +378,14 @@ def fit(self, data: Union[xr.DataArray, xr.Dataset]) -> None: if self._subtract_climatology: self._climatology = _get_climatology(data_rolling, self._timescale) - if self._detrend is not None: if self._subtract_climatology: deseasonalized = _subtract_climatology( data_rolling, self._timescale, self._climatology ) - self._trend = _get_trend(deseasonalized, self._detrend) + self._trend = _get_trend(deseasonalized, self._detrend, self._nan_mask) else: - self._trend = _get_trend(data_rolling, self._detrend) + self._trend = _get_trend(data_rolling, self._detrend, self._nan_mask) self._is_fit = True @@ -281,6 +430,61 @@ def fit_transform( self.fit(data) return self.transform(data) + def get_trend_timeseries(self, data, align_coords: bool = False): + """Get the trend timeseries from the data. + + Args: + data (xr.DataArray or xr.Dataset): input data. + align_coords (bool): Construction of trend timeseries only uses the time + coordinate. Hence, your input to-be-transformed-array might be a subset + of the original array, but it will still return the original coordinates + (as stored in the trendline). With align_coords=True, the trend + timeseries will be aligned to the data passed to this function. + """ + if not self._is_fit: + raise ValueError( + "The preprocessor has to be fit to data before the trend" + " timeseries can be requested." + ) + if self._detrend is None: + raise ValueError("Detrending is set to `None`, so no trend is available") + if align_coords: + trend = self.align_trend_coords(data) + else: + trend = self.trend + if self._detrend == "linear": + return _get_lineartrend_timeseries(data, trend) + elif self._detrend == "polynomial": + return _get_polytrend_timeseries(data, trend) + raise ValueError(f"Unkown detrending method '{self._detrend}'") + + def get_climatology_timeseries(self, data, align_coords: bool = False): + """Get the climatology timeseries from the data. + + Args: + data (xr.DataArray or xr.Dataset): input data. + align_coords (bool): Construction of climatology timeseries only uses the time + coordinate. This aligns the coords to the data. See + `get_trend_timeseries` for more information. + """ + if not self._is_fit: + raise ValueError( + "The preprocessor has to be fit to data before the trend" + " timeseries can be requested." + ) + if not self._subtract_climatology: + raise ValueError( + "`subtract_climatology is set to `False`, so no climatology " + "data is available" + ) + if align_coords: + climatology = xr.align(self.climatology, data)[0] + else: + climatology = self.climatology + return data.groupby("time.year").map( + lambda x: climatology_to_timeseries(x, climatology, self._timescale) + ) + @property def trend(self) -> dict: """Return the stored trend (dictionary).""" @@ -307,3 +511,22 @@ def climatology(self) -> Union[xr.DataArray, xr.Dataset]: " climatology can be requested." ) return self._climatology + + def align_trend_coords(self, data): + """Align coordinates between data and trend. + + Args: + data (xr.DataArray or xr.Dataset): time, (lat), (lon) array. + trend (): + """ + if self._detrend == "linear": + align_trend = self.trend.copy() + align_trend["intercept"], align_trend["slope"] = xr.align( + *[align_trend["intercept"], align_trend["slope"], data] + )[:2] + elif self._detrend == "polynomial": + align_trend = self.trend.copy() + align_trend["coefficients"] = xr.align(align_trend["coefficients"], data)[0] + else: + raise ValueError(f"Unkown detrending method '{self._detrend}'") + return align_trend diff --git a/s2spy/rgdr/__init__.py b/s2spy/rgdr/__init__.py index 7be7ebe..768b784 100644 --- a/s2spy/rgdr/__init__.py +++ b/s2spy/rgdr/__init__.py @@ -1,2 +1,3 @@ """Response Guided Dimensionality Reduction.""" + from . import label_alignment # noqa: F401 (unused import) diff --git a/s2spy/rgdr/label_alignment.py b/s2spy/rgdr/label_alignment.py index e409645..b55649e 100644 --- a/s2spy/rgdr/label_alignment.py +++ b/s2spy/rgdr/label_alignment.py @@ -1,4 +1,5 @@ """Label alignment tools for RGDR clusters.""" + import itertools import string from copy import copy diff --git a/s2spy/rgdr/rgdr.py b/s2spy/rgdr/rgdr.py index 6c8dd34..dfd4737 100644 --- a/s2spy/rgdr/rgdr.py +++ b/s2spy/rgdr/rgdr.py @@ -1,4 +1,5 @@ """Response Guided Dimensionality Reduction.""" + import warnings from os import linesep from typing import Optional @@ -606,12 +607,12 @@ def transform(self, data: xr.DataArray) -> xr.DataArray: # Add the geographical centers for later alignment between, e.g., splits reduced_data = utils.geographical_cluster_center(data, reduced_data) # Include explanations about geographical centers as attributes - reduced_data.attrs[ - "data" - ] = "Clustered data with Response Guided Dimensionality Reduction." - reduced_data.attrs[ - "coordinates" - ] = "Latitudes and longitudes are geographical centers associated with clusters." + reduced_data.attrs["data"] = ( + "Clustered data with Response Guided Dimensionality Reduction." + ) + reduced_data.attrs["coordinates"] = ( + "Latitudes and longitudes are geographical centers associated with clusters." + ) # Remove the '0' cluster reduced_data = reduced_data.where(reduced_data["cluster_labels"] != 0).dropna( diff --git a/s2spy/rgdr/utils.py b/s2spy/rgdr/utils.py index f6826a8..aed6d4a 100644 --- a/s2spy/rgdr/utils.py +++ b/s2spy/rgdr/utils.py @@ -1,4 +1,5 @@ """Commonly used utility functions for s2spy.""" + from typing import TypeVar import numpy as np import xarray as xr diff --git a/tests/test_preprocess.py b/tests/test_preprocess.py index 6e0c610..b64f28e 100644 --- a/tests/test_preprocess.py +++ b/tests/test_preprocess.py @@ -1,4 +1,5 @@ """Tests for the s2spy.preprocess module.""" + import numpy as np import pytest import scipy.signal @@ -100,11 +101,12 @@ class TestPreprocessor: """Test preprocessor.""" @pytest.fixture - def preprocessor(self): + def preprocessor(self, request): + method = request.param[0] prep = preprocess.Preprocessor( rolling_window_size=25, timescale="daily", - detrend="linear", + detrend=method, subtract_climatology=True, ) return prep @@ -158,6 +160,16 @@ def test_init(self): ) assert isinstance(prep, Preprocessor) + # pytest.mark.parametrize("preprocessor", ["linear", "polynomial"]) + + @pytest.mark.parametrize( + "preprocessor", + [ + ("linear",), + ("polynomial",), + ], + indirect=True, + ) def test_fit(self, preprocessor, raw_field): preprocessor.fit(raw_field) assert ( @@ -170,11 +182,27 @@ def test_fit_no_rolling(self, preprocessor_no_rolling, raw_field): raw_field, timescale="daily" ) + @pytest.mark.parametrize( + "preprocessor", + [ + ("linear",), + ("polynomial",), + ], + indirect=True, + ) def test_transform(self, preprocessor, raw_field): preprocessor.fit(raw_field) preprocessed_data = preprocessor.transform(raw_field) assert preprocessed_data is not None + @pytest.mark.parametrize( + "preprocessor", + [ + ("linear",), + ("polynomial",), + ], + indirect=True, + ) def test_transform_without_fit(self, preprocessor, raw_field): with pytest.raises(ValueError): preprocessor.transform(raw_field) @@ -209,10 +237,51 @@ def test_fit_and_transform_no_climatology_and_detrend(self, raw_field): assert results == raw_field - def test_fit_transform(self, preprocessor, raw_field): + @pytest.mark.parametrize( + "preprocessor", + [ + ("linear",), + ("polynomial",), + ], + indirect=True, + ) + def test_fit_transform_ds(self, preprocessor, raw_field): preprocessed_data = preprocessor.fit_transform(raw_field) assert preprocessed_data is not None + @pytest.mark.parametrize( + "preprocessor", + [ + ("linear",), + ("polynomial",), + ], + indirect=True, + ) + def test_fit_transform_da(self, preprocessor, raw_field): + + raw_field = raw_field.to_array().squeeze("variable").drop_vars("variable") + raw_field.name = "da_name" + years = np.unique(raw_field.time.dt.year.values) + train = raw_field.sel(time=raw_field.time.dt.year.isin([years[:-1]])) + tranform_to = raw_field.sel(time=raw_field.time.dt.year.isin([years[-2:]])) + fit_transformed = preprocessor.fit_transform(train) + transformed = preprocessor.transform(tranform_to) + assert fit_transformed is not None + assert bool( + ( + fit_transformed.sel(time="2013-01-01") + == transformed.sel(time="2013-01-01") + ).all() + ) + + @pytest.mark.parametrize( + "preprocessor", + [ + ("linear",), + ("polynomial",), + ], + indirect=True, + ) def test_trend_property_not_fit(self, preprocessor): with pytest.raises(ValueError, match="The preprocessor has to be fit"): preprocessor.trend @@ -222,6 +291,14 @@ def test_trend_property_no_detrend(self, preprocessor_no_detrend, raw_field): with pytest.raises(ValueError, match="Detrending is set to `None`"): preprocessor_no_detrend.trend + @pytest.mark.parametrize( + "preprocessor", + [ + ("linear",), + ("polynomial",), + ], + indirect=True, + ) def test_climatology_property_not_fit(self, preprocessor): with pytest.raises(ValueError, match="The preprocessor has to be fit"): preprocessor.climatology @@ -229,3 +306,105 @@ def test_climatology_property_not_fit(self, preprocessor): def test_trend_property_no_climatology(self, preprocessor_no_climatology): with pytest.raises(ValueError, match="subtract_climatology is set to `False`"): preprocessor_no_climatology.climatology + + def test_trend_with_nan(self, raw_field): + prep = preprocess.Preprocessor( + rolling_window_size=1, + timescale="daily", + detrend="linear", + subtract_climatology=True, + nan_mask="complete", + ) + single_doy = raw_field["sst"].sel(time=raw_field["sst"].time.dt.dayofyear == 1) + single_doy[:2, 0, 0] = np.nan # [0,0] lat/lon NaN at timestep 0, 1 + single_doy[2:, 1, 1] = np.nan # [1:,1,1] lat/lon NaN at timestep 2:end + + pp_field = prep.fit_transform(single_doy) + nans_in_pp_field = np.isnan(pp_field).sum("time")[0, 0] + assert int(nans_in_pp_field) == np.unique(pp_field.time.dt.year).size, ( + "If any NaNs are present in the data, " + "the entire timeseries should have become completely NaN in the output." + ) + + prep = preprocess.Preprocessor( + rolling_window_size=1, + timescale="daily", + detrend="linear", + subtract_climatology=True, + nan_mask="individual", + ) + + pp_field = prep.fit_transform(single_doy) + assert ( + np.isnan(pp_field).sum("time") == np.isnan(single_doy).sum("time") + ).all(), ( + "If any NaNs are present in the data, " + "the pp will only ignore those NaNs, but still fit a trendline." + "Hence, the NaNs should remain the same in this case." + ) + + @pytest.mark.parametrize( + "preprocessor", + [ + ("linear",), + ("polynomial",), + ], + indirect=True, + ) + def test_get_trendtimeseries_dataset(self, preprocessor, raw_field): + preprocessor.fit(raw_field) + trend = preprocessor.get_trend_timeseries(raw_field) + assert trend is not None + assert trend.dims == raw_field.dims + assert trend.sst.shape == raw_field.sst.shape + + # get timeseries if single lat-lon point is seleted: + subset_latlon = raw_field.isel(latitude=[0], longitude=[0]) + trend = preprocessor.get_trend_timeseries(subset_latlon, align_coords=True) + assert trend is not None + assert trend.dims == subset_latlon.dims + assert trend.sst.shape == subset_latlon.sst.shape + + @pytest.mark.parametrize( + "preprocessor", + [ + ("linear",), + ("polynomial",), + ], + indirect=True, + ) + def test_get_trendtimeseries_dataarray(self, preprocessor, raw_field): + raw_field = raw_field.to_array().squeeze("variable").drop_vars("variable") + preprocessor.fit(raw_field) + trend = preprocessor.get_trend_timeseries(raw_field) + assert trend is not None + assert ( + trend.dims == raw_field.dims + ), f"dims do not match \n {trend.dims} \n {raw_field.dims}" + assert ( + trend.shape == raw_field.shape + ), f"shape does not match \n {trend.shape} \n {raw_field.shape}" + + @pytest.mark.parametrize( + "preprocessor", + [ + ("linear",), + ("polynomial",), + ], + indirect=True, + ) + def test_get_climatology_timeseries(self, preprocessor, raw_field): + preprocessor.fit(raw_field) + climatology = preprocessor.get_climatology_timeseries(raw_field) + assert climatology is not None + assert climatology.dims == raw_field.dims + assert climatology.sst.shape == raw_field.sst.shape + + # get timeseries if single lat-lon point is seleted: + subset_latlon = raw_field.isel(latitude=[0], longitude=[0]) + climatology = preprocessor.get_climatology_timeseries( + subset_latlon, align_coords=True + ) + assert climatology is not None + assert climatology.dims == subset_latlon.dims + assert climatology.sst.shape == subset_latlon.sst.shape diff --git a/tests/test_rgdr/test_rgdr.py b/tests/test_rgdr/test_rgdr.py index 38a2681..5427270 100644 --- a/tests/test_rgdr/test_rgdr.py +++ b/tests/test_rgdr/test_rgdr.py @@ -1,4 +1,5 @@ """Tests for the s2s.rgdr module.""" + import matplotlib import matplotlib.pyplot as plt import numpy as np @@ -141,16 +142,16 @@ def test_pearsonr_nan(self): def test_correlation(self, dummy_dataarray, dummy_timeseries): c_val, p_val = rgdr.correlation(dummy_dataarray, dummy_timeseries) - np.testing.assert_equal(c_val.values, 1) - np.testing.assert_equal(p_val.values, 0) + np.testing.assert_almost_equal(c_val.values, 1, decimal=7) + np.testing.assert_almost_equal(p_val.values, 0, decimal=7) def test_correlation_dim_name(self, dummy_dataarray, dummy_timeseries): da = dummy_dataarray.rename({"time": "i_interval"}) ts = dummy_timeseries.rename({"time": "i_interval"}) c_val, p_val = rgdr.correlation(da, ts, corr_dim="i_interval") - np.testing.assert_equal(c_val.values, 1) - np.testing.assert_equal(p_val.values, 0) + np.testing.assert_almost_equal(c_val.values, 1, decimal=7) + np.testing.assert_almost_equal(p_val.values, 0, decimal=7) def test_correlation_wrong_target_dim_name(self, dummy_dataarray, dummy_timeseries): ts = dummy_timeseries.rename({"time": "dummy"})