From 60f520c273e3f40bf02da0e09fcdfcdf56845dd7 Mon Sep 17 00:00:00 2001 From: filippozacchei Date: Wed, 24 Sep 2025 15:21:48 -0700 Subject: [PATCH 1/2] Added sample weight to SINDy fit method. To do: account for WEAK sindy --- pysindy/optimizers/stlsq.py | 9 +++++ pysindy/pysindy.py | 75 +++++++++++++++++++++++++++++++++++-- pysindy/utils/base.py | 4 +- 3 files changed, 84 insertions(+), 4 deletions(-) diff --git a/pysindy/optimizers/stlsq.py b/pysindy/optimizers/stlsq.py index 9cbddd1b0..2c9e12842 100644 --- a/pysindy/optimizers/stlsq.py +++ b/pysindy/optimizers/stlsq.py @@ -78,6 +78,15 @@ class STLSQ(BaseOptimizer): history_ : list History of ``coef_``. ``history_[k]`` contains the values of ``coef_`` at iteration k of sequentially thresholded least-squares. + + + Notes + ----- + - Supports ``sample_weight`` during :meth:`fit`. Sample weights are applied + by rescaling rows of the regression problem (X, y) before column + normalization and thresholding. This allows weighted least squares + formulations in SINDy. + - When ``sample_weight`` is not provided, all samples are treated equally. Examples -------- diff --git a/pysindy/pysindy.py b/pysindy/pysindy.py index 4548673b5..25b6a5e66 100644 --- a/pysindy/pysindy.py +++ b/pysindy/pysindy.py @@ -294,6 +294,8 @@ def __init__( differentiation_method = FiniteDifference(axis=-2) self.differentiation_method = differentiation_method self.discrete_time = discrete_time + self.set_fit_request(sample_weight=True) + self.set_score_request(sample_weight=True) def fit( self, @@ -302,6 +304,7 @@ def fit( x_dot=None, u=None, feature_names: Optional[list[str]] = None, + sample_weight=None, ): """ Fit a SINDy model. @@ -342,6 +345,11 @@ def fit( feature_names : list of string, length n_input_features, optional Names for the input features (e.g. :code:`['x', 'y', 'z']`). If None, will use :code:`['x0', 'x1', ...]`. + + sample_weight : float or array-like of shape (n_samples,), optional + Per-sample weights for the regression. Passed internally to + the optimizer (e.g. STLSQ). Supports compatibility with + scikit-learn tools such as GridSearchCV when using weighted data. Returns ------- @@ -371,6 +379,10 @@ def fit( self.feature_names = feature_names + # User may give one weight per trajectory or one weight per sample + if sample_weight is not None: + sample_weight = _expand_sample_weights(sample_weight, x) + steps = [ ("features", self.feature_library), ("shaping", SampleConcatter()), @@ -378,7 +390,7 @@ def fit( ] x_dot = concat_sample_axis(x_dot) self.model = Pipeline(steps) - self.model.fit(x, x_dot) + self.model.fit(x, x_dot, model__sample_weight=sample_weight) self._fit_shape() return self @@ -412,6 +424,7 @@ def predict(self, x, u=None): x, _, u = _comprehend_and_validate_inputs(x, 1, None, u, self.feature_library) check_is_fitted(self, "model") + if self.n_control_features_ > 0 and u is None: raise TypeError("Model was fit using control variables, so u is required") if self.n_control_features_ == 0 and u is not None: @@ -467,7 +480,7 @@ def print(self, lhs=None, precision=3, **kwargs): names = f"{lhs[i]}" print(f"{names} = {eqn}", **kwargs) - def score(self, x, t, x_dot=None, u=None, metric=r2_score, **metric_kws): + def score(self, x, t, x_dot=None, u=None, metric=r2_score, sample_weight=None, **metric_kws): """ Returns a score for the time derivative prediction produced by the model. @@ -500,9 +513,14 @@ def score(self, x, t, x_dot=None, u=None, metric=r2_score, **metric_kws): See `Scikit-learn \ `_ for more options. + + sample_weight : array-like of shape (n_samples,), optional + Per-sample weights passed directly to the metric. This is the + preferred way to supply weights. metric_kws: dict, optional Optional keyword arguments to pass to the metric function. + Returns ------- @@ -523,10 +541,21 @@ def score(self, x, t, x_dot=None, u=None, metric=r2_score, **metric_kws): x, x_dot = self._process_trajectories(x, t, x_dot) + if sample_weight is not None: + sample_weight = _expand_sample_weights(sample_weight, x) + x_dot = concat_sample_axis(x_dot) x_dot_predict = concat_sample_axis(x_dot_predict) - x_dot, x_dot_predict = drop_nan_samples(x_dot, x_dot_predict) + if sample_weight is not None: + x_dot, x_dot_predict, good_idx = drop_nan_samples( + x_dot, x_dot_predict, return_indices=True + ) + sample_weight = sample_weight[good_idx] + metric_kws = {**metric_kws, "sample_weight": sample_weight} + else: + x_dot, x_dot_predict = drop_nan_samples(x_dot, x_dot_predict) + return metric(x_dot, x_dot_predict, **metric_kws) def _process_trajectories(self, x, t, x_dot): @@ -910,3 +939,43 @@ def comprehend_and_validate(arr, t): ) u = [comprehend_and_validate(ui, ti) for ui, ti in _zip_like_sequence(u, t)] return x, x_dot, u + +def _expand_sample_weights(sample_weight, trajectories): + """Expand trajectory-level weights to per-sample weights. + + Parameters + ---------- + sample_weight : array-like of shape (n_trajectories,) or (n_samples,), default=None + If length == n_trajectories, each trajectory weight is expanded to cover + all samples in that trajectory. + If length == n_samples, interpreted as per-sample weights directly. + If None, uniform weighting is applied. + + trajectories : list of array-like + The list of input trajectories, each shape (n_samples_i, n_features). + + Returns + ------- + sample_weight : ndarray of shape (sum_i n_samples_i,) + Per-sample weights, ready to use in metrics. + """ + if sample_weight is None: + return None + + sample_weight = np.asarray(sample_weight) + + n_traj = len(trajectories) + n_samples_total = sum(len(traj) for traj in trajectories) + + if sample_weight.ndim == 1 and len(sample_weight) == n_traj: + # Efficient expansion using np.repeat + traj_lengths = [len(traj) for traj in trajectories] + return np.repeat(sample_weight, traj_lengths) + + if sample_weight.ndim == 1 and len(sample_weight) == n_samples_total: + return sample_weight + + raise ValueError( + f"sample_weight must be length {n_traj} (per trajectory) or " + f"{n_samples_total} (per sample), got {len(sample_weight)}" + ) diff --git a/pysindy/utils/base.py b/pysindy/utils/base.py index a2ddc8401..a03c24d2f 100644 --- a/pysindy/utils/base.py +++ b/pysindy/utils/base.py @@ -128,7 +128,7 @@ def _check_control_shape(x, u, trim_last_point): return u_arr -def drop_nan_samples(x, y): +def drop_nan_samples(x, y, return_indices: bool = False): """Drops samples from x and y where either has a nan value""" x_non_sample_axes = tuple(ax for ax in range(x.ndim) if ax != x.ax_sample) y_non_sample_axes = tuple(ax for ax in range(y.ndim) if ax != y.ax_sample) @@ -137,6 +137,8 @@ def drop_nan_samples(x, y): good_sample_ind = np.nonzero(x_good_samples & y_good_samples)[0] x = x.take(good_sample_ind, axis=x.ax_sample) y = y.take(good_sample_ind, axis=y.ax_sample) + if return_indices: + return x, y, good_sample_ind return x, y From 97c0e102e9e5ac385e2279cf3876f372cb2014be Mon Sep 17 00:00:00 2001 From: filippozacchei Date: Tue, 14 Oct 2025 09:59:42 -0700 Subject: [PATCH 2/2] Files to be modified --- pysindy/_core.py | 162 ++++++++++++---- pysindy/feature_library/__init__.py | 2 + .../weighted_weak_pde_library.py | 151 +++++++++++++++ pysindy/optimizers/stlsq.py | 4 +- pysindy/utils/_axes.py | 3 +- test/test_expand_weights.py | 182 ++++++++++++++++++ test/test_weights_routing.py | 72 +++++++ 7 files changed, 538 insertions(+), 38 deletions(-) create mode 100644 pysindy/feature_library/weighted_weak_pde_library.py create mode 100644 test/test_expand_weights.py create mode 100644 test/test_weights_routing.py diff --git a/pysindy/_core.py b/pysindy/_core.py index 2cd38986e..b179ac025 100644 --- a/pysindy/_core.py +++ b/pysindy/_core.py @@ -11,10 +11,13 @@ from scipy.integrate import odeint from scipy.integrate import solve_ivp from scipy.interpolate import interp1d +from sklearn import set_config from sklearn.base import BaseEstimator from sklearn.metrics import r2_score from sklearn.pipeline import Pipeline from sklearn.utils.validation import check_is_fitted + +set_config(enable_metadata_routing=True) from typing_extensions import Self from .differentiation import BaseDifferentiation @@ -295,6 +298,7 @@ def __init__( self.discrete_time = discrete_time self.set_fit_request(sample_weight=True) self.set_score_request(sample_weight=True) + self.optimizer.set_fit_request(sample_weight=True) def fit( self, @@ -344,7 +348,7 @@ def fit( feature_names : list of string, length n_input_features, optional Names for the input features (e.g. :code:`['x', 'y', 'z']`). If None, will use :code:`['x0', 'x1', ...]`. - + sample_weight : float or array-like of shape (n_samples,), optional Per-sample weights for the regression. Passed internally to the optimizer (e.g. STLSQ). Supports compatibility with @@ -378,10 +382,16 @@ def fit( self.feature_names = feature_names - # User may give one weight per trajectory or one weight per sample if sample_weight is not None: - sample_weight = _expand_sample_weights(sample_weight, x) - + mode = ( + "weak" + if "Weak" in self.feature_library.__class__.__name__ + else "standard" + ) + sample_weight = _expand_sample_weights( + sample_weight, x, feature_library=self.feature_library, mode=mode + ) + steps = [ ("features", self.feature_library), ("shaping", SampleConcatter()), @@ -389,7 +399,7 @@ def fit( ] x_dot = concat_sample_axis(x_dot) self.model = Pipeline(steps) - self.model.fit(x, x_dot, model__sample_weight=sample_weight) + self.model.fit(x, x_dot, sample_weight=sample_weight) self._fit_shape() return self @@ -423,7 +433,7 @@ def predict(self, x, u=None): x, _, u = _comprehend_and_validate_inputs(x, 1, None, u, self.feature_library) check_is_fitted(self, "model") - + if self.n_control_features_ > 0 and u is None: raise TypeError("Model was fit using control variables, so u is required") if self.n_control_features_ == 0 and u is not None: @@ -479,7 +489,16 @@ def print(self, lhs=None, precision=3, **kwargs): names = f"{lhs[i]}" print(f"{names} = {eqn}", **kwargs) - def score(self, x, t, x_dot=None, u=None, metric=r2_score, sample_weight=None, **metric_kws): + def score( + self, + x, + t, + x_dot=None, + u=None, + metric=r2_score, + sample_weight=None, + **metric_kws, + ): """ Returns a score for the time derivative prediction produced by the model. @@ -512,14 +531,14 @@ def score(self, x, t, x_dot=None, u=None, metric=r2_score, sample_weight=None, * See `Scikit-learn \ `_ for more options. - + sample_weight : array-like of shape (n_samples,), optional Per-sample weights passed directly to the metric. This is the preferred way to supply weights. metric_kws: dict, optional Optional keyword arguments to pass to the metric function. - + Returns ------- @@ -542,7 +561,7 @@ def score(self, x, t, x_dot=None, u=None, metric=r2_score, sample_weight=None, * if sample_weight is not None: sample_weight = _expand_sample_weights(sample_weight, x) - + x_dot = concat_sample_axis(x_dot) x_dot_predict = concat_sample_axis(x_dot_predict) @@ -551,7 +570,7 @@ def score(self, x, t, x_dot=None, u=None, metric=r2_score, sample_weight=None, * x_dot, x_dot_predict, return_indices=True ) sample_weight = sample_weight[good_idx] - metric_kws = {**metric_kws, "sample_weight": sample_weight} + metric_kws = {**metric_kws, "sample_weight": sample_weight} else: x_dot, x_dot_predict = drop_nan_samples(x_dot, x_dot_predict) @@ -939,42 +958,115 @@ def comprehend_and_validate(arr, t): u = [comprehend_and_validate(ui, ti) for ui, ti in _zip_like_sequence(u, t)] return x, x_dot, u -def _expand_sample_weights(sample_weight, trajectories): - """Expand trajectory-level weights to per-sample weights. + +def _expand_sample_weights( + sample_weight, trajectories, feature_library=None, mode="standard" +): + """ + Expand per-trajectory or per-sample weights for use in SINDy estimators. Parameters ---------- - sample_weight : array-like of shape (n_trajectories,) or (n_samples,), default=None - If length == n_trajectories, each trajectory weight is expanded to cover - all samples in that trajectory. - If length == n_samples, interpreted as per-sample weights directly. - If None, uniform weighting is applied. + sample_weight : sequence of scalars or array-like + Weights for each trajectory. In "standard" mode, each entry can be: + - a scalar weight (applied to all samples in that trajectory), or + - an array of length equal to the number of samples (n_time) for that + trajectory. + In "weak" mode, each entry must be a single scalar weight per trajectory. - trajectories : list of array-like - The list of input trajectories, each shape (n_samples_i, n_features). + trajectories : sequence + Sequence of trajectory-like objects, each having attributes `n_time` and + `n_coord`. + + feature_library : object, optional + Library instance used in weak-form mode. Must define attribute `K` + (the number of weak test functions). If missing, assumes K=1 with a warning. + + mode : {'standard', 'weak'}, default='standard' + - "standard": Expand per-sample weights to match concatenated samples. + - "weak": Repeat each trajectory’s single scalar weight `K` times. Returns ------- - sample_weight : ndarray of shape (sum_i n_samples_i,) - Per-sample weights, ready to use in metrics. + np.ndarray or None + A 1D numpy array of concatenated and expanded sample weights, + or None if `sample_weight` is None. """ + # ------------------------------------------------------------- + # Early exit for None + # ------------------------------------------------------------- if sample_weight is None: return None - sample_weight = np.asarray(sample_weight) + if not ( + isinstance(sample_weight, Sequence) + and not isinstance(sample_weight, np.ndarray) + ): + raise ValueError( + "sample_weight must be a list or tuple, not a scalar or numpy array." + ) - n_traj = len(trajectories) - n_samples_total = sum(len(traj) for traj in trajectories) + if len(sample_weight) != len(trajectories): + raise ValueError("sample_weight length must match number of trajectories.") - if sample_weight.ndim == 1 and len(sample_weight) == n_traj: - # Efficient expansion using np.repeat - traj_lengths = [len(traj) for traj in trajectories] - return np.repeat(sample_weight, traj_lengths) + # ------------------------------------------------------------- + # Weak mode: one weight per trajectory, repeated K times + # ------------------------------------------------------------- + if mode == "weak": + if feature_library is None: + raise ValueError("feature_library is required in weak mode.") - if sample_weight.ndim == 1 and len(sample_weight) == n_samples_total: - return sample_weight + K = getattr(feature_library, "K", None) + if K is None: + warnings.warn("feature_library missing 'K'; assuming K=1.", UserWarning) + K = 1 - raise ValueError( - f"sample_weight must be length {n_traj} (per trajectory) or " - f"{n_samples_total} (per sample), got {len(sample_weight)}" - ) + validated = [] + for w, traj in zip(sample_weight, trajectories): + arr = np.asarray(w) + if arr.ndim > 0 and arr.size > 1: + raise ValueError( + "Weak mode expects exactly one weight per trajectory (scalar), " + f"but got shape {arr.shape} for trajectory with {traj.n_time}" + f"samples." + ) + validated.append(float(arr)) + return np.repeat(validated, K) + + # ------------------------------------------------------------- + # Standard mode: expand scalars or per-sample arrays + # ------------------------------------------------------------- + expanded = [] + for w, traj in zip(sample_weight, trajectories): + arr = np.asarray(w) + + # Scalar → expand to all samples in trajectory + if arr.ndim == 0: + arr = np.full(traj.n_time, arr, dtype=float) + + # 1D array → must match number of samples + elif arr.ndim == 1: + if arr.shape[0] != traj.n_time: + raise ValueError( + f"sample_weight length {arr.shape[0]} does" + f" not match trajectory length {traj.n_time}." + ) + + # 2D array → only (n,1) allowed + elif arr.ndim == 2: + if arr.shape[1] != 1: + raise ValueError( + "sample_weight 2D arrays must have second dimension = 1." + ) + if arr.shape[0] != traj.n_time: + raise ValueError( + "sample_weight 2D array length does not match trajectory length." + ) + arr = arr.ravel() + + else: + raise ValueError("Invalid sample_weight shape.") + + expanded.append(arr.ravel()) + + return np.concatenate(expanded) diff --git a/pysindy/feature_library/__init__.py b/pysindy/feature_library/__init__.py index 5a82fc03a..621ea741c 100644 --- a/pysindy/feature_library/__init__.py +++ b/pysindy/feature_library/__init__.py @@ -10,6 +10,7 @@ from .polynomial_library import PolynomialLibrary from .sindy_pi_library import SINDyPILibrary from .weak_pde_library import WeakPDELibrary +from .weighted_weak_pde_library import WeightedWeakPDELibrary __all__ = [ "ConcatLibrary", @@ -21,6 +22,7 @@ "PolynomialLibrary", "PDELibrary", "WeakPDELibrary", + "WeightedWeakPDELibrary", "SINDyPILibrary", "ParameterizedLibrary", "base", diff --git a/pysindy/feature_library/weighted_weak_pde_library.py b/pysindy/feature_library/weighted_weak_pde_library.py new file mode 100644 index 000000000..fe613b925 --- /dev/null +++ b/pysindy/feature_library/weighted_weak_pde_library.py @@ -0,0 +1,151 @@ +import numpy as np + +from ..utils import AxesArray +from .weak_pde_library import WeakPDELibrary + + +class WeightedWeakPDELibrary(WeakPDELibrary): + """ + WeakPDELibrary with GLS whitening via a Cholesky factor built from the + variance field on the spatiotemporal grid. + + Parameters + ---------- + spatiotemporal_weights : ndarray, shape = spatiotemporal_grid.shape[:-1] + Pointwise noise variances σ^2 on the grid (no feature axis). + The covariance of the weak RHS is Cov[V] = M_y diag(σ^2) M_y^T. + + Notes + ----- + The whitener W = L^{-1}, with L L^T = Cov[V], is left-applied to both Θ and V. + This implements min_x || W(Θ x - V) ||_2^2, i.e., GLS in the weak space. + """ + + def __init__(self, *args, spatiotemporal_weights=None, **kwargs): + self.spatiotemporal_weights = spatiotemporal_weights + self._L_chol = None # lower-triangular Cholesky factor of Cov[V] + super().__init__(*args, **kwargs) + + # ------------------------------ core whitening ------------------------------ + + def _build_whitener_from_variance(self): + """ + Construct L such that Cov[V] = L L^T with + Cov[V]_{kℓ} = sum_{g ∈ grid} ( w_k[g] * w_ℓ[g] * σ^2[g] ), + where w_k are the time-derivative weak weights on domain k. + """ + if self.spatiotemporal_weights is None: + self._L_chol = None + return + + # --- robust weight-field shape handling --- + base_grid = np.asarray(self.spatiotemporal_grid) + expected = tuple(base_grid.shape[:-1]) # e.g. (Nx, Nt) for a 2D grid + var_grid = np.asarray(self.spatiotemporal_weights) + + if var_grid.shape == expected + (1,): + var_grid = var_grid[..., 0] + elif var_grid.shape != expected: + raise ValueError( + f"spatiotemporal_weights must have \ + shape {expected} or {expected + (1,)}, " + f"got {var_grid.shape}" + ) + + # Flattened variance for convenient indexing + var_flat = var_grid.ravel(order="C") + grid_shape = expected + K = self.K + + idx_lists = [] + val_lists = [] + for k in range(K): + # local multi-index grids (can be 1D, 2D, 3D… arrays) + inds_axes = [np.asarray(ax, dtype=np.intp) for ax in self.inds_k[k]] + grids = np.meshgrid(*inds_axes, indexing="ij") + + # linearize to 1D! + lin_idx = np.ravel_multi_index(tuple(grids), dims=grid_shape, order="C") + lin_idx = lin_idx.ravel(order="C") + + # corresponding weak RHS weights, flattened to 1D + wk = np.asarray(self.fulltweights[k], dtype=float).ravel(order="C") + + if wk.shape[0] != lin_idx.shape[0]: + raise RuntimeError( + f"Weight/variance size mismatch on cell {k}: " + f"wk has {wk.shape[0]} entries, indices have {lin_idx.shape[0]}" + ) + + vals = wk * np.sqrt(var_flat[lin_idx]) + + idx_lists.append(lin_idx) + val_lists.append(vals) + + # Build Cov[V] = B B^T with B_{k,i} = w_k[i] * sqrt(var[i]) + Cov = np.zeros((K, K), dtype=float) + for k in range(K): + vk = val_lists[k] + Cov[k, k] = np.dot(vk, vk) + idx_k = idx_lists[k] + + map_k = dict(zip(idx_k.tolist(), vk.tolist())) + for ell in range(k + 1, K): + s = 0.0 + idx_e = idx_lists[ell] + v_e = val_lists[ell] + map_e = dict(zip(idx_e.tolist(), v_e.tolist())) + if len(map_k) <= len(map_e): + for j, vkj in map_k.items(): + ve = map_e.get(j) + if ve is not None: + s += vkj * ve + else: + for j, ve in map_e.items(): + vk_j = map_k.get(j) + if vk_j is not None: + s += vk_j * ve + Cov[k, ell] = s + Cov[ell, k] = s + + avg_diag = np.trace(Cov) / max(K, 1) + nugget = 1e-12 * avg_diag + Cov.flat[:: K + 1] += nugget + + try: + self._L_chol = np.linalg.cholesky(Cov) + except np.linalg.LinAlgError: + # inflate nugget and retry once + Cov.flat[:: K + 1] += max(1e-10, 1e-6 * avg_diag) + self._L_chol = np.linalg.cholesky(Cov) + + def _apply_whitener(self, A): + """Return L^{-1} A without forming L^{-1} explicitly.""" + if self._L_chol is None: + return A + return np.linalg.solve(self._L_chol, A) + + # ------------------------------ hooks ------------------------------ + + def _weak_form_setup(self): + # parent builds inds_k and the weak weight tensors + super()._weak_form_setup() + # then build the GLS whitener from the variance field + if self.spatiotemporal_weights is not None: + self._build_whitener_from_variance() + + def convert_u_dot_integral(self, u): + Vy = super().convert_u_dot_integral(u) # (K, 1) + Vy_w = self._apply_whitener(np.asarray(Vy)) + return AxesArray(Vy_w, {"ax_sample": 0, "ax_coord": 1}) + + def transform(self, x_full): + VTheta_list = super().transform(x_full) # list of (K, n_features) + if self._L_chol is None: + return VTheta_list + out = [] + for VTheta in VTheta_list: + A = np.asarray(VTheta) + A_w = self._apply_whitener(A) # (K, m) + out.append(AxesArray(A_w, {"ax_sample": 0, "ax_coord": 1})) + return out diff --git a/pysindy/optimizers/stlsq.py b/pysindy/optimizers/stlsq.py index 2c9e12842..6914ba34e 100644 --- a/pysindy/optimizers/stlsq.py +++ b/pysindy/optimizers/stlsq.py @@ -78,8 +78,8 @@ class STLSQ(BaseOptimizer): history_ : list History of ``coef_``. ``history_[k]`` contains the values of ``coef_`` at iteration k of sequentially thresholded least-squares. - - + + Notes ----- - Supports ``sample_weight`` during :meth:`fit`. Sample weights are applied diff --git a/pysindy/utils/_axes.py b/pysindy/utils/_axes.py index f554cb616..22240d689 100644 --- a/pysindy/utils/_axes.py +++ b/pysindy/utils/_axes.py @@ -69,6 +69,7 @@ import numpy as np from numpy.typing import NDArray +from sklearn.base import BaseEstimator from sklearn.base import TransformerMixin HANDLED_FUNCTIONS = {} @@ -826,7 +827,7 @@ def comprehend_axes(x): return axes -class SampleConcatter(TransformerMixin): +class SampleConcatter(BaseEstimator, TransformerMixin): def __init__(self): pass diff --git a/test/test_expand_weights.py b/test/test_expand_weights.py new file mode 100644 index 000000000..131c4c6c4 --- /dev/null +++ b/test/test_expand_weights.py @@ -0,0 +1,182 @@ +import warnings + +import numpy as np +import pytest + +from pysindy._core import _expand_sample_weights + + +class Trajectory: + """Minimal trajectory object with attributes matching SINDy expectations.""" + + def __init__(self, n_time, n_variables): + self.n_time = n_time + self.n_coord = n_variables + + +class WeakLibrary: + """Mock weak-form feature library with K test functions.""" + + def __init__(self, K=2): + self.K = K + + +# --------------------------------------------------------------------- +# STANDARD MODE TESTS +# --------------------------------------------------------------------- + + +def test_standard_mode_with_scalar_weights(): + """ + Each trajectory's scalar weight should expand to one per sample. + """ + trajectories = [Trajectory(4, 2), Trajectory(6, 2)] + weights = [1.0, 2.0] + + expanded = _expand_sample_weights(weights, trajectories, mode="standard") + + assert expanded.shape == (10,) # 4 + 6 samples + assert np.allclose(expanded[:4], 1.0) + assert np.allclose(expanded[4:], 2.0) + + +def test_standard_mode_with_per_sample_weights(): + """ + Per-sample weights should concatenate into one long 1D array. + """ + trajectories = [Trajectory(3, 2), Trajectory(2, 2)] + weights = [np.arange(3), np.arange(10, 12)] + + expanded = _expand_sample_weights(weights, trajectories, mode="standard") + + expected = np.concatenate([np.arange(3), np.arange(10, 12)]) + assert np.allclose(expanded, expected) + assert expanded.ndim == 1 + + +def test_standard_mode_flattens_column_vectors(): + """Column vectors (n, 1) should flatten correctly.""" + trajectories = [Trajectory(5, 3)] + weights = [np.arange(5).reshape(-1, 1)] + + expanded = _expand_sample_weights(weights, trajectories, mode="standard") + + assert expanded.shape == (5,) + assert np.allclose(expanded, np.arange(5)) + + +def test_standard_mode_rejects_mismatched_lengths(): + """A weight array must match the number of samples in its trajectory.""" + trajectories = [Trajectory(4, 2)] + weights = [np.ones(3)] + + with pytest.raises(ValueError, match="trajectory length"): + _expand_sample_weights(weights, trajectories, mode="standard") + + +# --------------------------------------------------------------------- +# WEAK FORM MODE TESTS +# --------------------------------------------------------------------- + + +def test_weak_mode_expands_one_weight_per_trajectory(): + """ + Weak mode: one scalar weight per trajectory is repeated by K test functions. + The total length is n_trajectories * K. + """ + num_trajectories = 3 + trajectories = [Trajectory(5, 2) for _ in range(num_trajectories)] + library = WeakLibrary(K=4) + + weights = [1.0, 2.0, 3.0] + + expanded = _expand_sample_weights( + weights, trajectories, feature_library=library, mode="weak" + ) + + assert expanded.shape == (num_trajectories * library.K,) + expected = np.repeat([1.0, 2.0, 3.0], library.K) + assert np.allclose(expanded, expected) + + +def test_weak_mode_rejects_per_sample_weights(): + """ + Weak mode should not accept per-sample weights. + Each trajectory must have exactly one scalar weight. + """ + trajectories = [Trajectory(5, 1)] + library = WeakLibrary(K=2) + weights = [np.ones(5)] # Invalid: multiple samples + + with pytest.raises(ValueError, match="one weight per trajectory"): + _expand_sample_weights( + weights, trajectories, feature_library=library, mode="weak" + ) + + +def test_weak_mode_warns_if_library_missing_K(): + """Warn and assume K=1 if the feature library has no K attribute.""" + trajectories = [Trajectory(2, 2)] + + class LibraryWithoutK: + pass + + library = LibraryWithoutK() + weights = [1.0] + + with warnings.catch_warnings(record=True) as captured: + expanded = _expand_sample_weights( + weights, trajectories, feature_library=library, mode="weak" + ) + + assert expanded.shape == (1 * len(trajectories),) + assert any("missing 'K'" in str(warning.message) for warning in captured) + + +# --------------------------------------------------------------------- +# VALIDATION TESTS +# --------------------------------------------------------------------- + + +def test_rejects_non_list_weights(): + """sample_weight must be a list or tuple, not a numpy array.""" + trajectories = [Trajectory(3, 1)] + with pytest.raises(ValueError, match="list or tuple"): + _expand_sample_weights(np.ones(3), trajectories) + + +def test_rejects_mismatched_number_of_trajectories(): + """The number of weight entries must match the number of trajectories.""" + trajectories = [Trajectory(3, 1), Trajectory(3, 1)] + weights = [np.ones(3)] + with pytest.raises(ValueError, match="length must match"): + _expand_sample_weights(weights, trajectories) + + +# --------------------------------------------------------------------- +# INTEGRATION TEST +# --------------------------------------------------------------------- + + +def test_expanded_weights_work_with_rescale_data(): + """ + Expanded weights should work seamlessly with optimizer._rescale_data. + """ + from pysindy.optimizers.base import _rescale_data + + num_samples, num_features = 8, 3 + X = np.random.randn(num_samples, num_features) + y = np.random.randn(num_samples, 1) + + trajectories = [Trajectory(8, 3)] + weights = [np.linspace(1.0, 2.0, 8)] + + expanded = _expand_sample_weights(weights, trajectories, mode="standard") + + assert expanded.shape == (8,) + X_scaled, y_scaled = _rescale_data(X, y, expanded) + + assert X_scaled.shape == X.shape + assert y_scaled.shape == y.shape + # Check scaling for first sample + assert np.allclose(X_scaled[0], X[0] * np.sqrt(expanded[0])) diff --git a/test/test_weights_routing.py b/test/test_weights_routing.py new file mode 100644 index 000000000..3f9597b56 --- /dev/null +++ b/test/test_weights_routing.py @@ -0,0 +1,72 @@ +import numpy as np +import pytest +from sklearn.linear_model import LinearRegression + +from pysindy import SINDy # adjust import to your package path + + +@pytest.fixture(scope="session") +def simple_systems(): + """Generate two simple 2D dynamical systems: + System A: x' = -y, y' = x + System B: x' = -2y, y' = 2x + """ + t = np.linspace(0, 2 * np.pi, 50) + x_a = np.stack([np.cos(t), np.sin(t)], axis=1) + xdot_a = np.stack([-np.sin(t), np.cos(t)], axis=1) + + x_b = np.stack([np.cos(2 * t), np.sin(2 * t)], axis=1) + xdot_b = np.stack([-2 * np.sin(2 * t), 2 * np.cos(2 * t)], axis=1) + + return (x_a, xdot_a), (x_b, xdot_b) + + +def test_metadata_routing_sample_weight(simple_systems): + """Test that sample weights route correctly through SINDy fit(). + + The expected coefficients are a convex combination of the systems' + true coefficients, weighted by the number of trajectories and/or + sample weights. This verifies that behind-the-scenes routing of + sample_weight → optimizer.fit() works correctly. + """ + (x_a, xdot_a), (x_b, xdot_b) = simple_systems + + # --- Build training trajectories --- + # One system duplicated twice (implicit weighting) + X_trajs = [x_a, x_a, x_b] + Xdot_trajs = [xdot_a, xdot_a, xdot_b] + + # --- Simple library and optimizer setup --- + sindy = SINDy(optimizer=LinearRegression(fit_intercept=False)) + + # --- Fit without explicit sample weights --- + sindy.fit(X_trajs, t=0.1, x_dot=Xdot_trajs) + coef_unweighted = np.copy(sindy.model.named_steps["model"].coef_) + + # --- Fit with sample weights to emphasize trajectory 3 (different system) --- + sample_weight = [np.ones(len(x_a)), np.ones(len(x_a)), 10 * np.ones(len(x_b))] + sindy.fit(X_trajs, t=0.1, x_dot=Xdot_trajs, sample_weight=sample_weight) + coef_weighted = np.copy(sindy.model.named_steps["model"].coef_) + + # --- Assertions --- + # 1. Shapes are consistent + assert coef_weighted.shape == coef_unweighted.shape + + # 2. The coefficients must differ when weighting is applied + assert not np.allclose(coef_weighted, coef_unweighted) + + # 3. Weighted model should bias toward system B coefficients + # since trajectory B had much higher weight + # True systems differ by factor of 2 + ratio = np.mean(np.abs(coef_weighted / coef_unweighted)) + assert ( + ratio > 1.05 + ), "Weighted coefficients should reflect stronger influence from system B" + + # 4. Convex combination logic sanity check + # Unweighted: (A + A + B)/3 = A * 2/3 + B * 1/3 + # Weighted: (A + A + 10*B)/(12) ≈ A * 2/12 + B * 10/12 + # So weighted coefficients should be closer to B's dynamics + assert np.linalg.norm(coef_weighted - 2 * coef_unweighted) < np.linalg.norm( + coef_unweighted + )