From 3aecd2b501f0a9892eccc116a9170f2e0f92228a Mon Sep 17 00:00:00 2001 From: SIKAI ZHANG <34108862+MatthewSZhang@users.noreply.github.com> Date: Thu, 17 Apr 2025 13:43:41 +0800 Subject: [PATCH] FEAT add param fit_intercept for NARX --- .github/workflows/lint.yml | 2 +- .github/workflows/test.yml | 2 +- fastcan/narx.py | 66 +++++++++++++++++++++++++------ tests/test_narx.py | 40 +++++++++++++++++++ tests/test_narx_jac.py | 81 +++++++++++++++++++++++++++++++++++++- 5 files changed, 176 insertions(+), 15 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index e150c3f..ed0f8e3 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -9,7 +9,7 @@ jobs: steps: - uses: actions/checkout@v4 - - uses: prefix-dev/setup-pixi@v0.8.4 + - uses: prefix-dev/setup-pixi@v0.8.8 with: environments: default cache: true diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1056a21..bb4c99a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -21,7 +21,7 @@ jobs: steps: - uses: actions/checkout@v4 - - uses: prefix-dev/setup-pixi@v0.8.3 + - uses: prefix-dev/setup-pixi@v0.8.8 with: environments: default cache: true diff --git a/fastcan/narx.py b/fastcan/narx.py index ab57808..c5c62bf 100644 --- a/fastcan/narx.py +++ b/fastcan/narx.py @@ -570,6 +570,8 @@ class NARX(MultiOutputMixin, RegressorMixin, BaseEstimator): The id numbers indicate which output the polynomial term belongs to. It is useful in multi-output case. + fit_intercept : bool, default=True + Whether to fit the intercept. If set to False, intercept will be zeros. Attributes ---------- @@ -641,6 +643,7 @@ class NARX(MultiOutputMixin, RegressorMixin, BaseEstimator): "feat_ids": [None, "array-like"], "delay_ids": [None, "array-like"], "output_ids": [None, "array-like"], + "fit_intercept": ["boolean"], } def __init__( @@ -649,10 +652,12 @@ def __init__( feat_ids=None, delay_ids=None, output_ids=None, + fit_intercept=True, ): self.feat_ids = feat_ids self.delay_ids = delay_ids self.output_ids = output_ids + self.fit_intercept = fit_intercept @validate_params( { @@ -769,13 +774,12 @@ def fit(self, X, y, sample_weight=None, coef_init=None, **params): ) self.max_delay_ = self.delay_ids_.max() - n_coef_intercept = n_terms + self.n_outputs_ if isinstance(coef_init, (type(None), str)): # fit a one-step-ahead NARX model time_shift_ids, poly_ids = fd2tp(self.feat_ids_, self.delay_ids_) xy_hstack = np.c_[X, y] - osa_narx = LinearRegression() + osa_narx = LinearRegression(fit_intercept=self.fit_intercept) time_shift_vars = make_time_shift_features(xy_hstack, time_shift_ids) poly_terms = make_poly_features(time_shift_vars, poly_ids) # Remove missing values @@ -787,7 +791,10 @@ def fit(self, X, y, sample_weight=None, coef_init=None, **params): for i in range(self.n_outputs_): output_i_mask = self.output_ids_ == i if np.sum(output_i_mask) == 0: - intercept[i] = np.mean(y_masked[:, i]) + if self.fit_intercept: + intercept[i] = np.mean(y_masked[:, i]) + else: + intercept[i] = 0.0 continue osa_narx.fit( poly_terms_masked[:, output_i_mask], @@ -801,13 +808,20 @@ def fit(self, X, y, sample_weight=None, coef_init=None, **params): self.intercept_ = intercept return self - coef_init = np.r_[coef, intercept] + if self.fit_intercept: + coef_init = np.r_[coef, intercept] + else: + coef_init = coef else: coef_init = check_array( coef_init, ensure_2d=False, dtype=float, ) + if self.fit_intercept: + n_coef_intercept = n_terms + self.n_outputs_ + else: + n_coef_intercept = n_terms if coef_init.shape[0] != n_coef_intercept: raise ValueError( "`coef_init` should have the shape of " @@ -829,6 +843,7 @@ def fit(self, X, y, sample_weight=None, coef_init=None, **params): self.feat_ids_, self.delay_ids_, self.output_ids_, + self.fit_intercept, sample_weight_sqrt, grad_yyd_ids, grad_coef_ids, @@ -837,8 +852,12 @@ def fit(self, X, y, sample_weight=None, coef_init=None, **params): ), **params, ) - self.coef_ = res.x[: -self.n_outputs_] - self.intercept_ = res.x[-self.n_outputs_ :] + if self.fit_intercept: + self.coef_ = res.x[: -self.n_outputs_] + self.intercept_ = res.x[-self.n_outputs_ :] + else: + self.coef_ = res.x + self.intercept_ = np.zeros(self.n_outputs_, dtype=float) return self @staticmethod @@ -931,6 +950,7 @@ def _update_dydx( feat_ids, delay_ids, output_ids, + fit_intercept, grad_yyd_ids, grad_coef_ids, grad_feat_ids, @@ -947,8 +967,13 @@ def _update_dydx( n_samples, n_y = y_hat.shape max_delay = np.max(delay_ids) n_coefs = feat_ids.shape[0] - n_x = n_coefs + n_y # Total number of coefficients and intercepts - y_ids = np.r_[output_ids, np.arange(n_y)] + if fit_intercept: + n_x = n_coefs + n_y # Total number of coefficients and intercepts + y_ids = np.r_[output_ids, np.arange(n_y)] + else: + n_x = n_coefs + y_ids = output_ids + x_ids = np.arange(n_x) dydx = np.zeros((n_samples, n_y, n_x), dtype=float) @@ -1011,13 +1036,18 @@ def _loss( feat_ids, delay_ids, output_ids, + fit_intercept, sample_weight_sqrt, *args, ): # Sum of squared errors n_outputs = y.shape[1] - coef = coef_intercept[:-n_outputs] - intercept = coef_intercept[-n_outputs:] + if fit_intercept: + coef = coef_intercept[:-n_outputs] + intercept = coef_intercept[-n_outputs:] + else: + coef = coef_intercept + intercept = np.zeros(n_outputs, dtype=float) y_hat = NARX._predict( expression, @@ -1045,6 +1075,7 @@ def _grad( feat_ids, delay_ids, output_ids, + fit_intercept, sample_weight_sqrt, grad_yyd_ids, grad_coef_ids, @@ -1053,8 +1084,12 @@ def _grad( ): # Sum of squared errors n_outputs = y.shape[1] - coef = coef_intercept[:-n_outputs] - intercept = coef_intercept[-n_outputs:] + if fit_intercept: + coef = coef_intercept[:-n_outputs] + intercept = coef_intercept[-n_outputs:] + else: + coef = coef_intercept + intercept = np.zeros(n_outputs, dtype=float) y_hat = NARX._predict( expression, @@ -1073,6 +1108,7 @@ def _grad( feat_ids, delay_ids, output_ids, + fit_intercept, grad_yyd_ids, grad_coef_ids, grad_feat_ids, @@ -1266,6 +1302,7 @@ def _get_term_str(term_feat_ids, term_delay_ids): "poly_degree": [ Interval(Integral, 1, None, closed="left"), ], + "fit_intercept": ["boolean"], "include_zero_delay": [None, "array-like"], "static_indices": [None, "array-like"], "refine_verbose": ["verbose"], @@ -1288,6 +1325,7 @@ def make_narx( max_delay=1, poly_degree=1, *, + fit_intercept=True, include_zero_delay=None, static_indices=None, refine_verbose=1, @@ -1316,6 +1354,9 @@ def make_narx( poly_degree : int, default=1 The maximum degree of polynomial features. + fit_intercept : bool, default=True + Whether to fit the intercept. If set to False, intercept will be zeros. + include_zero_delay : {None, array-like} of shape (n_features,) default=None Whether to include the original (zero-delay) features. @@ -1469,4 +1510,5 @@ def make_narx( feat_ids=feat_ids, delay_ids=delay_ids, output_ids=output_ids, + fit_intercept=fit_intercept, ) diff --git a/tests/test_narx.py b/tests/test_narx.py index 6f0172e..3d6c21e 100644 --- a/tests/test_narx.py +++ b/tests/test_narx.py @@ -278,6 +278,46 @@ def test_mulit_output_warn(): 0.0, ) +def test_fit_intercept(): + X = np.random.rand(10, 2) + y = np.random.rand(10, 1) + time_shift_ids = np.array([[0, 1], [1, 1]]) + poly_ids = np.array([[1, 1], [2, 2]]) + feat_ids, delay_ids = tp2fd(time_shift_ids, poly_ids) + + narx = NARX( + feat_ids=feat_ids, + delay_ids=delay_ids, + fit_intercept=False, + ) + narx.fit(X, y) + assert_almost_equal(narx.intercept_, 0.0) + narx.fit(X, y, coef_init="one_step_ahead") + assert_almost_equal(narx.intercept_, 0.0) + + X = np.random.rand(10, 2) + y = np.random.rand(10, 2) + time_shift_ids = np.array([[0, 1], [1, 1]]) + poly_ids = np.array([[1, 1], [2, 2]]) + feat_ids, delay_ids = tp2fd(time_shift_ids, poly_ids) + + narx = make_narx(X, y, 1, 2, 2, fit_intercept=False) + narx.fit(X, y) + assert_array_equal(narx.intercept_, [0.0, 0.0]) + narx.fit(X, y, coef_init="one_step_ahead") + assert_array_equal(narx.intercept_, [0.0, 0.0]) + + with pytest.warns(UserWarning, match="output_ids got"): + narx = NARX( + feat_ids=feat_ids, + delay_ids=delay_ids, + fit_intercept=False, + ) + narx.fit(X, y) + assert_array_equal(narx.intercept_, [0.0, 0.0]) + narx.fit(X, y, coef_init=[0, 0]) + assert_array_equal(narx.intercept_, [0.0, 0.0]) + def test_mulit_output_error(): X = np.random.rand(10, 2) y = np.random.rand(10, 2) diff --git a/tests/test_narx_jac.py b/tests/test_narx_jac.py index 9405815..3ce2e68 100644 --- a/tests/test_narx_jac.py +++ b/tests/test_narx_jac.py @@ -24,7 +24,7 @@ def test_simple(): output_ids = np.array([0, 0], dtype=np.int32) coef = np.array([0.4, 1]) intercept = np.array([1], dtype=float) - sample_weight = np.array([1, 1, 1], dtype=float) + sample_weight = np.array([1, 1, 1], dtype=float).reshape(-1, 1) y_hat = NARX._predict( @@ -72,6 +72,7 @@ def test_simple(): feat_ids, delay_ids, output_ids, + True, np.sqrt(sample_weight), grad_yyd_ids, grad_coef_ids, @@ -80,6 +81,37 @@ def test_simple(): ) assert_almost_equal(grad.sum(axis=0), grad_truth, decimal=4) + grad_0 = NARX._grad( + np.r_[coef_1, 0], + _predict_step, + X, + y, + feat_ids, + delay_ids, + output_ids, + True, + np.sqrt(sample_weight), + grad_yyd_ids, + grad_coef_ids, + grad_feat_ids, + grad_delay_ids, + ) + grad = NARX._grad( + coef_1, + _predict_step, + X, + y, + feat_ids, + delay_ids, + output_ids, + False, + np.sqrt(sample_weight), + grad_yyd_ids, + grad_coef_ids, + grad_feat_ids, + grad_delay_ids, + ) + assert_almost_equal(grad.sum(axis=0), grad_0.sum(axis=0)[:-1]) def test_complex(): """Complex model""" @@ -172,6 +204,7 @@ def test_complex(): feat_ids, delay_ids, output_ids, + True, np.sqrt(np.ones((y.shape[0], 1))), grad_yyd_ids, grad_coef_ids, @@ -219,6 +252,52 @@ def test_complex(): assert_allclose(grad.sum(axis=0)[i], grad_num.sum(), rtol=1e-1) + grad = NARX._grad( + coef, + _predict_step, + X, + y, + feat_ids, + delay_ids, + output_ids, + False, + np.sqrt(np.ones((y.shape[0], 1))), + grad_yyd_ids, + grad_coef_ids, + grad_feat_ids, + grad_delay_ids, + ) + y_hat_0 = NARX._predict( + _predict_step, + X=X, + y_ref=y, + coef=coef, + intercept=[0, 0], + feat_ids=feat_ids, + delay_ids=delay_ids, + output_ids=output_ids, + ) + e_0 = y_hat_0 - y + + for i in range(len(coef)): + coef_1 = np.copy(coef) + coef_1[i] += delta_w + + y_hat_1 = NARX._predict( + _predict_step, + X=X, + y_ref=y, + coef=coef_1, + intercept=[0, 0], + feat_ids=feat_ids, + delay_ids=delay_ids, + output_ids=output_ids, + ) + + e_1 = y_hat_1 - y + grad_num = (e_1 - e_0).sum(axis=1) / delta_w + + assert_allclose(grad.sum(axis=0)[i], grad_num.sum(), rtol=1e-1) def test_score_nan(): """Test fitting scores when data contain nan."""