22
22
23
23
floatX = pytensor .config .floatX
24
24
25
- # TODO: check test for error_var=True, since there are problems with statsmodels, the matrices looks the same by some experiments done in notebooks
26
- # (FAILED tests/statespace/models/test_DFM.py::test_DFM_update_matches_statsmodels[True-2-2-2] - numpy.linalg.LinAlgError: 1-th leading minor of the array is not positive definite)
27
-
28
25
29
26
@pytest .fixture (scope = "session" )
30
27
def data ():
@@ -43,7 +40,7 @@ def create_sm_test_values_mapping(
43
40
"""Convert PyMC test values to statsmodels parameter format"""
44
41
sm_test_values = {}
45
42
46
- # 1. Factor loadings: PyMC shape (n_endog, k_factors) -> statsmodels individual params
43
+ # Factor loadings: PyMC shape (n_endog, k_factors) -> statsmodels individual params
47
44
factor_loadings = test_values ["factor_loadings" ]
48
45
all_pairs = product (data .columns , range (1 , k_factors + 1 ))
49
46
sm_test_values .update (
@@ -53,7 +50,7 @@ def create_sm_test_values_mapping(
53
50
}
54
51
)
55
52
56
- # 2. Factor AR coefficients: PyMC shape (k_factors, factor_order*k_factors) -> L{lag}.f{to}.f{from}
53
+ # Factor AR coefficients: PyMC shape (k_factors, factor_order*k_factors) -> L{lag}.f{to}.f{from}
57
54
if factor_order > 0 and "factor_ar" in test_values :
58
55
factor_ar = test_values ["factor_ar" ]
59
56
triplets = product (
@@ -68,7 +65,7 @@ def create_sm_test_values_mapping(
68
65
}
69
66
)
70
67
71
- # 3a. Error AR coefficients: PyMC shape (n_endog, error_order) -> L{lag}.e(var).e(var)
68
+ # Error AR coefficients: PyMC shape (n_endog, error_order) -> L{lag}.e(var).e(var)
72
69
if error_order > 0 and not error_var and "error_ar" in test_values :
73
70
error_ar = test_values ["error_ar" ]
74
71
pairs = product (enumerate (data .columns ), range (1 , error_order + 1 ))
@@ -79,7 +76,7 @@ def create_sm_test_values_mapping(
79
76
}
80
77
)
81
78
82
- # 3b. Error AR coefficients: PyMC shape (n_endog, error_order * n_endog) -> L{lag}.e(var).e(var)
79
+ # Error AR coefficients: PyMC shape (n_endog, error_order * n_endog) -> L{lag}.e(var).e(var)
83
80
elif error_order > 0 and error_var and "error_ar" in test_values :
84
81
error_ar = test_values ["error_ar" ]
85
82
triplets = product (
@@ -97,7 +94,7 @@ def create_sm_test_values_mapping(
97
94
}
98
95
)
99
96
100
- # 4. Observation error variances:
97
+ # Observation error variances:
101
98
if "error_sigma" in test_values :
102
99
error_sigma = test_values ["error_sigma" ]
103
100
sm_test_values .update (
@@ -113,10 +110,15 @@ def create_sm_test_values_mapping(
113
110
@pytest .mark .parametrize ("k_factors" , [1 , 2 ])
114
111
@pytest .mark .parametrize ("factor_order" , [0 , 1 , 2 ])
115
112
@pytest .mark .parametrize ("error_order" , [0 , 1 , 2 ])
116
- @pytest .mark .parametrize ("error_var" , [False ])
113
+ @pytest .mark .parametrize ("error_var" , [True , False ])
117
114
@pytest .mark .filterwarnings ("ignore::statsmodels.tools.sm_exceptions.EstimationWarning" )
118
115
@pytest .mark .filterwarnings ("ignore::FutureWarning" )
119
116
def test_DFM_update_matches_statsmodels (data , k_factors , factor_order , error_order , error_var , rng ):
117
+ if error_var and (factor_order > 0 or error_order > 0 ):
118
+ pytest .xfail (
119
+ "Statsmodels may be doing something wrong with error_var=True and (factor_order > 0 or error_order > 0) [numpy.linalg.LinAlgError: 1-th leading minor of the array is not positive definite]"
120
+ )
121
+
120
122
mod = BayesianDynamicFactor (
121
123
k_factors = k_factors ,
122
124
factor_order = factor_order ,
@@ -155,14 +157,13 @@ def test_DFM_update_matches_statsmodels(data, k_factors, factor_order, error_ord
155
157
test_values , data , k_factors , factor_order , error_order , error_var
156
158
)
157
159
158
- # Initialize and constrain statsmodels model
159
160
x0 = test_values ["x0" ]
160
161
P0 = test_values ["P0" ]
161
162
162
163
sm_dfm .initialize_known (initial_state = x0 , initial_state_cov = P0 )
163
164
sm_dfm .fit_constrained ({name : sm_test_values [name ] for name in sm_dfm .param_names })
164
165
165
- # Get PyMC matrices using the same pattern as ETS test
166
+ # Get PyMC matrices
166
167
matrices = mod ._unpack_statespace_with_placeholders ()
167
168
inputs = list (explicit_graph_inputs (matrices ))
168
169
input_names = [x .name for x in inputs ]
@@ -238,10 +239,8 @@ def simulate_from_numpy_model(
238
239
239
240
240
241
@pytest .mark .parametrize ("n_obs,n_runs" , [(100 , 200 )])
241
- def test_exog_betas_random_walk (n_obs , n_runs ):
242
+ def test_DFM_exog_betas_random_walk (n_obs , n_runs ):
242
243
rng = np .random .default_rng (123 )
243
-
244
- # Example model
245
244
dfm_mod = BayesianDynamicFactor (
246
245
k_factors = 1 ,
247
246
factor_order = 1 ,
@@ -255,7 +254,7 @@ def test_exog_betas_random_walk(n_obs, n_runs):
255
254
measurement_error = False ,
256
255
)
257
256
258
- # Parameters
257
+ # Arbitrary Parameters
259
258
param_dict = {
260
259
"factor_loadings" : np .array ([[0.9 ], [0.8 ]]),
261
260
"factor_ar" : np .array ([[0.5 ]]),
@@ -284,14 +283,13 @@ def test_exog_betas_random_walk(n_obs, n_runs):
284
283
var_t1 = betas_t1 .var (axis = 0 )
285
284
var_t100 = betas_t100 .var (axis = 0 )
286
285
287
- # ---- Assertion ----
288
286
assert np .all (
289
287
var_t100 > var_t1
290
288
), f"Expected variance at T=100 > T=1, got { var_t1 } vs { var_t100 } "
291
289
292
290
293
291
@pytest .mark .parametrize ("shared" , [True , False ])
294
- def test_exog_shared_vs_not (shared ):
292
+ def test_DFM_exog_shared_vs_not (shared ):
295
293
rng = np .random .default_rng (123 )
296
294
297
295
n_obs = 50
@@ -301,7 +299,6 @@ def test_exog_shared_vs_not(shared):
301
299
# Dummy exogenous data
302
300
exog = rng .normal (size = (n_obs , k_exog ))
303
301
304
- # Create the model
305
302
dfm_mod = BayesianDynamicFactor (
306
303
k_factors = 1 ,
307
304
factor_order = 1 ,
@@ -313,34 +310,60 @@ def test_exog_shared_vs_not(shared):
313
310
error_cov_type = "diagonal" ,
314
311
measurement_error = False ,
315
312
)
313
+
316
314
k_exog_states = dfm_mod .k_exog * dfm_mod .k_endog if not shared else dfm_mod .k_exog
317
315
318
- # Dummy parameters (small values so simulation is stable)
316
+ if shared :
317
+ beta = np .array ([0.3 , 0.5 ])
318
+ else :
319
+ beta = np .array ([0.3 , 0.5 , 1.0 , 2.0 ])
320
+
319
321
param_dict = {
320
322
"factor_loadings" : np .array ([[0.9 ], [0.8 ]]),
321
323
"factor_ar" : np .array ([[0.5 ]]),
322
324
"error_ar" : np .array ([[0.4 ], [0.3 ]]),
323
325
"error_sigma" : np .array ([0.1 , 0.2 ]),
324
326
"P0" : np .eye (dfm_mod .k_states ),
325
327
"x0" : np .zeros (dfm_mod .k_states - k_exog_states ),
326
- "beta" : np . array ([ 0.3 , 0.5 , 1 , 2 ]) if not shared else np . array ([ 0.3 , 0.5 ]) ,
328
+ "beta" : beta ,
327
329
}
328
330
329
331
data_dict = {"exog_data" : exog }
330
332
331
333
# Simulate trajectory
332
334
x_traj , y_traj = simulate_from_numpy_model (dfm_mod , rng , param_dict , data_dict , steps = n_obs )
333
335
334
- # Extract contribution from exogenous variables at time t
335
- beta = param_dict ["beta" ].reshape (- 1 , k_exog ) # shape depends on shared flag
336
- exog_t = exog [10 ] # pick a random time point
336
+ # Test 1: Check hidden states
337
+ # Extract exogenous hidden states at time t=10
338
+ t = 10
339
+ exog_states_start = dfm_mod .k_states - k_exog_states
340
+ exog_states_end = dfm_mod .k_states
341
+ exog_hidden_states = x_traj [t , exog_states_start :exog_states_end ]
342
+
343
+ if shared :
344
+ # When shared=True, there should be k_exog states total
345
+ assert len (exog_hidden_states ) == k_exog
346
+ else :
347
+ # When shared=False, there should be k_exog * k_endog states
348
+ assert len (exog_hidden_states ) == k_exog * k_endog
349
+ # Each endogenous variable has its own set of exogenous states
350
+ exog_states_reshaped = exog_hidden_states .reshape (k_endog , k_exog )
351
+ assert not np .allclose (exog_states_reshaped [0 ], exog_states_reshaped [1 ])
352
+
353
+ # Test 2: Check observed contributions
354
+ exog_t = exog [t ]
337
355
338
356
if shared :
339
- # all endogs get the same contribution
357
+ # All endogenous variables get the same beta * data contribution
340
358
contributions = [beta @ exog_t for _ in range (k_endog )]
341
- assert np .allclose (contributions [0 ], contributions [1 :])
359
+ assert np .allclose (
360
+ contributions [0 ], contributions [1 ]
361
+ ), "Expected same contribution for all endog when shared=True"
342
362
else :
343
- # each endog gets a different contribution
344
- contributions = [beta [i ] @ exog_t for i in range (k_endog )]
345
- # check that at least one differs
346
- assert not np .allclose (contributions [0 ], contributions [1 :])
363
+ # Each endogenous variable gets different beta * data
364
+ beta_reshaped = beta .reshape (k_endog , k_exog )
365
+ contributions = [beta_reshaped [i ] @ exog_t for i in range (k_endog )]
366
+ # Check that contributions are different
367
+ assert not np .allclose (
368
+ contributions [0 ], contributions [1 ]
369
+ ), f"Expected different contributions, got { contributions } "
0 commit comments