|
1 | 1 | import numpy as np
|
2 | 2 | import pandas as pd
|
3 | 3 | import pymc as pm
|
| 4 | +import pytest |
4 | 5 |
|
5 | 6 | from numpy.testing import assert_allclose
|
6 | 7 | from pytensor import config
|
|
15 | 16 | RTOL = 0 if config.floatX.endswith("64") else 1e-6
|
16 | 17 |
|
17 | 18 |
|
18 |
| -def test_exogenous_component(rng): |
19 |
| - data = rng.normal(size=(100, 2)).astype(config.floatX) |
20 |
| - mod = st.RegressionComponent(state_names=["feature_1", "feature_2"], name="exog") |
| 19 | +@pytest.fixture |
| 20 | +def regression_data(rng): |
| 21 | + """Generate test data for regression components (2 exogenous variables).""" |
| 22 | + return rng.normal(size=(100, 2)).astype(config.floatX) |
21 | 23 |
|
22 |
| - params = {"beta_exog": np.array([1.0, 2.0], dtype=config.floatX)} |
23 |
| - exog_data = {"data_exog": data} |
24 |
| - x, y = simulate_from_numpy_model(mod, rng, params, exog_data) |
25 | 24 |
|
26 |
| - # Check that the generated data is just a linear regression |
27 |
| - assert_allclose(y, data @ params["beta_exog"], atol=ATOL, rtol=RTOL) |
| 25 | +@pytest.fixture |
| 26 | +def multiple_regression_data(rng): |
| 27 | + """Generate test data for multiple regression components.""" |
| 28 | + return { |
| 29 | + "data_1": rng.normal(size=(100, 2)).astype(config.floatX), |
| 30 | + "data_2": rng.normal(size=(100, 1)).astype(config.floatX), |
| 31 | + } |
28 | 32 |
|
29 |
| - mod = mod.build(verbose=False) |
30 |
| - _assert_basic_coords_correct(mod) |
31 |
| - assert mod.coords["state_exog"] == ["feature_1", "feature_2"] |
32 | 33 |
|
33 |
| - |
34 |
| -def test_adding_exogenous_component(rng): |
35 |
| - data = rng.normal(size=(100, 2)).astype(config.floatX) |
36 |
| - reg = st.RegressionComponent(state_names=["a", "b"], name="exog") |
37 |
| - ll = st.LevelTrendComponent(name="level") |
38 |
| - |
39 |
| - seasonal = st.FrequencySeasonality(name="annual", season_length=12, n=4) |
40 |
| - mod = reg + ll + seasonal |
41 |
| - |
42 |
| - assert mod.ssm["design"].eval({"data_exog": data}).shape == (100, 1, 2 + 2 + 8) |
43 |
| - assert_allclose(mod.ssm["design", 5, 0, :2].eval({"data_exog": data}), data[5]) |
44 |
| - |
45 |
| - |
46 |
| -def test_regression_with_multiple_observed_states(rng): |
47 |
| - from scipy.linalg import block_diag |
48 |
| - |
49 |
| - data = rng.normal(size=(100, 2)).astype(config.floatX) |
50 |
| - mod = st.RegressionComponent( |
51 |
| - state_names=["feature_1", "feature_2"], |
52 |
| - name="exog", |
53 |
| - observed_state_names=["data_1", "data_2"], |
54 |
| - ) |
55 |
| - |
56 |
| - params = {"beta_exog": np.array([[1.0, 2.0], [3.0, 4.0]], dtype=config.floatX)} |
57 |
| - exog_data = {"data_exog": data} |
58 |
| - x, y = simulate_from_numpy_model(mod, rng, params, exog_data) |
59 |
| - |
60 |
| - assert x.shape == (100, 4) # 2 features, 2 states |
61 |
| - assert y.shape == (100, 2) |
62 |
| - |
63 |
| - # Check that the generated data are two independent linear regressions |
64 |
| - assert_allclose(y[:, 0], data @ params["beta_exog"][0], atol=ATOL, rtol=RTOL) |
65 |
| - assert_allclose(y[:, 1], data @ params["beta_exog"][1], atol=ATOL, rtol=RTOL) |
66 |
| - |
67 |
| - mod = mod.build(verbose=False) |
68 |
| - assert mod.coords["state_exog"] == [ |
69 |
| - "feature_1", |
70 |
| - "feature_2", |
71 |
| - ] |
72 |
| - |
73 |
| - Z = mod.ssm["design"].eval({"data_exog": data}) |
74 |
| - vec_block_diag = np.vectorize(block_diag, signature="(n,m),(o,p)->(q,r)") |
75 |
| - assert Z.shape == (100, 2, 4) |
76 |
| - assert np.allclose(Z, vec_block_diag(data[:, None, :], data[:, None, :])) |
77 |
| - |
78 |
| - |
79 |
| -def test_add_regression_components_with_multiple_observed_states(rng): |
80 |
| - from scipy.linalg import block_diag |
81 |
| - |
82 |
| - data_1 = rng.normal(size=(100, 2)).astype(config.floatX) |
83 |
| - data_2 = rng.normal(size=(100, 1)).astype(config.floatX) |
84 |
| - |
85 |
| - reg1 = st.RegressionComponent( |
86 |
| - state_names=["a", "b"], name="exog1", observed_state_names=["data_1", "data_2"] |
87 |
| - ) |
88 |
| - reg2 = st.RegressionComponent(state_names=["c"], name="exog2", observed_state_names=["data_3"]) |
89 |
| - |
90 |
| - mod = (reg1 + reg2).build(verbose=False) |
91 |
| - assert mod.coords["state_exog1"] == ["a", "b"] |
92 |
| - assert mod.coords["state_exog2"] == ["c"] |
93 |
| - |
94 |
| - Z = mod.ssm["design"].eval({"data_exog1": data_1, "data_exog2": data_2}) |
95 |
| - vec_block_diag = np.vectorize(block_diag, signature="(n,m),(o,p)->(q,r)") |
96 |
| - assert Z.shape == (100, 3, 5) |
97 |
| - assert np.allclose( |
98 |
| - Z, |
99 |
| - vec_block_diag(vec_block_diag(data_1[:, None, :], data_1[:, None, :]), data_2[:, None, :]), |
100 |
| - ) |
101 |
| - |
102 |
| - x0 = mod.ssm["initial_state"].eval( |
103 |
| - { |
104 |
| - "beta_exog1": np.array([[1.0, 2.0], [3.0, 4.0]], dtype=config.floatX), |
105 |
| - "beta_exog2": np.array([5.0], dtype=config.floatX), |
106 |
| - } |
107 |
| - ) |
108 |
| - np.testing.assert_allclose(x0, np.array([1.0, 2.0, 3.0, 4.0, 5.0], dtype=config.floatX)) |
109 |
| - |
110 |
| - |
111 |
| -def test_filter_scans_time_varying_design_matrix(rng): |
| 34 | +@pytest.fixture |
| 35 | +def time_series_data(rng): |
| 36 | + """Generate time series data for PyMC integration tests.""" |
112 | 37 | time_idx = pd.date_range(start="2000-01-01", freq="D", periods=100)
|
113 | 38 | data = pd.DataFrame(rng.normal(size=(100, 2)), columns=["a", "b"], index=time_idx)
|
114 |
| - |
115 | 39 | y = pd.DataFrame(rng.normal(size=(100, 1)), columns=["data"], index=time_idx)
|
116 |
| - |
117 |
| - reg = st.RegressionComponent(state_names=["a", "b"], name="exog") |
118 |
| - mod = reg.build(verbose=False) |
119 |
| - |
120 |
| - with pm.Model(coords=mod.coords) as m: |
121 |
| - data_exog = pm.Data("data_exog", data.values) |
122 |
| - |
123 |
| - x0 = pm.Normal("x0", dims=["state"]) |
124 |
| - P0 = pm.Deterministic("P0", pt.eye(mod.k_states), dims=["state", "state_aux"]) |
125 |
| - beta_exog = pm.Normal("beta_exog", dims=["state_exog"]) |
126 |
| - |
127 |
| - mod.build_statespace_graph(y) |
128 |
| - x0, P0, c, d, T, Z, R, H, Q = mod.unpack_statespace() |
129 |
| - pm.Deterministic("Z", Z) |
130 |
| - |
131 |
| - prior = pm.sample_prior_predictive(draws=10) |
132 |
| - |
133 |
| - prior_Z = prior.prior.Z.values |
134 |
| - assert prior_Z.shape == (1, 10, 100, 1, 2) |
135 |
| - assert_allclose(prior_Z[0, :, :, 0, :], data.values[None].repeat(10, axis=0)) |
| 40 | + return data, y |
| 41 | + |
| 42 | + |
| 43 | +class TestRegressionComponent: |
| 44 | + """Test basic regression component functionality.""" |
| 45 | + |
| 46 | + @pytest.mark.parametrize("innovations", [False, True]) |
| 47 | + def test_exogenous_component(self, rng, regression_data, innovations): |
| 48 | + """Test basic regression component with and without innovations.""" |
| 49 | + mod = st.RegressionComponent( |
| 50 | + state_names=["feature_1", "feature_2"], name="exog", innovations=innovations |
| 51 | + ) |
| 52 | + |
| 53 | + params = {"beta_exog": np.array([1.0, 2.0], dtype=config.floatX)} |
| 54 | + if innovations: |
| 55 | + params["sigma_beta_exog"] = np.array([0.1, 0.2], dtype=config.floatX) |
| 56 | + |
| 57 | + exog_data = {"data_exog": regression_data} |
| 58 | + x, y = simulate_from_numpy_model(mod, rng, params, exog_data) |
| 59 | + |
| 60 | + if not innovations: |
| 61 | + # Check that the generated data is just a linear regression |
| 62 | + assert_allclose(y, regression_data @ params["beta_exog"], atol=ATOL, rtol=RTOL) |
| 63 | + else: |
| 64 | + # With innovations, the coefficients should vary over time |
| 65 | + # The initial state should match the beta parameters |
| 66 | + assert_allclose(x[0], params["beta_exog"], atol=ATOL, rtol=RTOL) |
| 67 | + |
| 68 | + mod = mod.build(verbose=False) |
| 69 | + _assert_basic_coords_correct(mod) |
| 70 | + assert mod.coords["state_exog"] == ["feature_1", "feature_2"] |
| 71 | + |
| 72 | + if innovations: |
| 73 | + # Check that sigma_beta parameter is included |
| 74 | + assert "sigma_beta_exog" in mod.param_names |
| 75 | + |
| 76 | + @pytest.mark.parametrize("innovations", [False, True]) |
| 77 | + def test_adding_exogenous_component(self, rng, regression_data, innovations): |
| 78 | + """Test adding regression component to other components.""" |
| 79 | + reg = st.RegressionComponent(state_names=["a", "b"], name="exog", innovations=innovations) |
| 80 | + ll = st.LevelTrendComponent(name="level") |
| 81 | + seasonal = st.FrequencySeasonality(name="annual", season_length=12, n=4) |
| 82 | + mod = reg + ll + seasonal |
| 83 | + |
| 84 | + assert mod.ssm["design"].eval({"data_exog": regression_data}).shape == (100, 1, 2 + 2 + 8) |
| 85 | + assert_allclose( |
| 86 | + mod.ssm["design", 5, 0, :2].eval({"data_exog": regression_data}), regression_data[5] |
| 87 | + ) |
| 88 | + |
| 89 | + if innovations: |
| 90 | + # Check that sigma_beta parameter is included in the combined model |
| 91 | + assert "sigma_beta_exog" in mod.param_names |
| 92 | + |
| 93 | + |
| 94 | +class TestMultivariateRegression: |
| 95 | + """Test multivariate regression functionality.""" |
| 96 | + |
| 97 | + @pytest.mark.parametrize("innovations", [False, True]) |
| 98 | + def test_regression_with_multiple_observed_states(self, rng, regression_data, innovations): |
| 99 | + """Test multivariate regression with and without innovations.""" |
| 100 | + from scipy.linalg import block_diag |
| 101 | + |
| 102 | + mod = st.RegressionComponent( |
| 103 | + state_names=["feature_1", "feature_2"], |
| 104 | + name="exog", |
| 105 | + observed_state_names=["data_1", "data_2"], |
| 106 | + innovations=innovations, |
| 107 | + ) |
| 108 | + |
| 109 | + params = {"beta_exog": np.array([[1.0, 2.0], [3.0, 4.0]], dtype=config.floatX)} |
| 110 | + if innovations: |
| 111 | + params["sigma_beta_exog"] = np.array([[0.1, 0.2], [0.3, 0.4]], dtype=config.floatX) |
| 112 | + |
| 113 | + exog_data = {"data_exog": regression_data} |
| 114 | + x, y = simulate_from_numpy_model(mod, rng, params, exog_data) |
| 115 | + |
| 116 | + assert x.shape == (100, 4) # 2 features, 2 states |
| 117 | + assert y.shape == (100, 2) |
| 118 | + |
| 119 | + if not innovations: |
| 120 | + # Check that the generated data are two independent linear regressions |
| 121 | + assert_allclose(y[:, 0], regression_data @ params["beta_exog"][0], atol=ATOL, rtol=RTOL) |
| 122 | + assert_allclose(y[:, 1], regression_data @ params["beta_exog"][1], atol=ATOL, rtol=RTOL) |
| 123 | + else: |
| 124 | + # Check that initial states match the beta parameters |
| 125 | + assert_allclose(x[0, :2], params["beta_exog"][0], atol=ATOL, rtol=RTOL) |
| 126 | + assert_allclose(x[0, 2:], params["beta_exog"][1], atol=ATOL, rtol=RTOL) |
| 127 | + |
| 128 | + mod = mod.build(verbose=False) |
| 129 | + assert mod.coords["state_exog"] == ["feature_1", "feature_2"] |
| 130 | + |
| 131 | + Z = mod.ssm["design"].eval({"data_exog": regression_data}) |
| 132 | + vec_block_diag = np.vectorize(block_diag, signature="(n,m),(o,p)->(q,r)") |
| 133 | + assert Z.shape == (100, 2, 4) |
| 134 | + assert np.allclose( |
| 135 | + Z, |
| 136 | + vec_block_diag(regression_data[:, None, :], regression_data[:, None, :]), |
| 137 | + ) |
| 138 | + |
| 139 | + if innovations: |
| 140 | + # Check that sigma_beta parameter is included |
| 141 | + assert "sigma_beta_exog" in mod.param_names |
| 142 | + |
| 143 | + |
| 144 | +class TestMultipleRegressionComponents: |
| 145 | + """Test multiple regression components functionality.""" |
| 146 | + |
| 147 | + @pytest.mark.parametrize("innovations", [False, True]) |
| 148 | + def test_add_regression_components_with_multiple_observed_states( |
| 149 | + self, rng, multiple_regression_data, innovations |
| 150 | + ): |
| 151 | + """Test adding multiple regression components with and without innovations.""" |
| 152 | + from scipy.linalg import block_diag |
| 153 | + |
| 154 | + reg1 = st.RegressionComponent( |
| 155 | + state_names=["a", "b"], |
| 156 | + name="exog1", |
| 157 | + observed_state_names=["data_1", "data_2"], |
| 158 | + innovations=innovations, |
| 159 | + ) |
| 160 | + reg2 = st.RegressionComponent( |
| 161 | + state_names=["c"], |
| 162 | + name="exog2", |
| 163 | + observed_state_names=["data_3"], |
| 164 | + innovations=innovations, |
| 165 | + ) |
| 166 | + |
| 167 | + mod = (reg1 + reg2).build(verbose=False) |
| 168 | + assert mod.coords["state_exog1"] == ["a", "b"] |
| 169 | + assert mod.coords["state_exog2"] == ["c"] |
| 170 | + |
| 171 | + Z = mod.ssm["design"].eval( |
| 172 | + { |
| 173 | + "data_exog1": multiple_regression_data["data_1"], |
| 174 | + "data_exog2": multiple_regression_data["data_2"], |
| 175 | + } |
| 176 | + ) |
| 177 | + vec_block_diag = np.vectorize(block_diag, signature="(n,m),(o,p)->(q,r)") |
| 178 | + assert Z.shape == (100, 3, 5) |
| 179 | + assert np.allclose( |
| 180 | + Z, |
| 181 | + vec_block_diag( |
| 182 | + vec_block_diag( |
| 183 | + multiple_regression_data["data_1"][:, None, :], |
| 184 | + multiple_regression_data["data_1"][:, None, :], |
| 185 | + ), |
| 186 | + multiple_regression_data["data_2"][:, None, :], |
| 187 | + ), |
| 188 | + ) |
| 189 | + |
| 190 | + x0 = mod.ssm["initial_state"].eval( |
| 191 | + { |
| 192 | + "beta_exog1": np.array([[1.0, 2.0], [3.0, 4.0]], dtype=config.floatX), |
| 193 | + "beta_exog2": np.array([5.0], dtype=config.floatX), |
| 194 | + } |
| 195 | + ) |
| 196 | + np.testing.assert_allclose(x0, np.array([1.0, 2.0, 3.0, 4.0, 5.0], dtype=config.floatX)) |
| 197 | + |
| 198 | + if innovations: |
| 199 | + # Check that sigma_beta parameters are included |
| 200 | + assert "sigma_beta_exog1" in mod.param_names |
| 201 | + assert "sigma_beta_exog2" in mod.param_names |
| 202 | + |
| 203 | + |
| 204 | +class TestPyMCIntegration: |
| 205 | + """Test PyMC integration functionality.""" |
| 206 | + |
| 207 | + @pytest.mark.parametrize("innovations", [False, True]) |
| 208 | + def test_filter_scans_time_varying_design_matrix(self, rng, time_series_data, innovations): |
| 209 | + """Test PyMC integration with and without innovations.""" |
| 210 | + data, y = time_series_data |
| 211 | + |
| 212 | + reg = st.RegressionComponent(state_names=["a", "b"], name="exog", innovations=innovations) |
| 213 | + mod = reg.build(verbose=False) |
| 214 | + |
| 215 | + with pm.Model(coords=mod.coords) as m: |
| 216 | + data_exog = pm.Data("data_exog", data.values) |
| 217 | + |
| 218 | + x0 = pm.Normal("x0", dims=["state"]) |
| 219 | + P0 = pm.Deterministic("P0", pt.eye(mod.k_states), dims=["state", "state_aux"]) |
| 220 | + beta_exog = pm.Normal("beta_exog", dims=["state_exog"]) |
| 221 | + |
| 222 | + if innovations: |
| 223 | + sigma_beta_exog = pm.Exponential("sigma_beta_exog", 1, dims=["state_exog"]) |
| 224 | + |
| 225 | + mod.build_statespace_graph(y) |
| 226 | + x0, P0, c, d, T, Z, R, H, Q = mod.unpack_statespace() |
| 227 | + pm.Deterministic("Z", Z) |
| 228 | + |
| 229 | + prior = pm.sample_prior_predictive(draws=10) |
| 230 | + |
| 231 | + prior_Z = prior.prior.Z.values |
| 232 | + assert prior_Z.shape == (1, 10, 100, 1, 2) |
| 233 | + assert_allclose(prior_Z[0, :, :, 0, :], data.values[None].repeat(10, axis=0)) |
| 234 | + |
| 235 | + if innovations: |
| 236 | + # Check that sigma_beta parameter is included in the prior |
| 237 | + assert "sigma_beta_exog" in prior.prior.data_vars |
0 commit comments