1
+ from itertools import product
2
+
1
3
import numpy as np
2
4
import pandas as pd
3
5
import pymc as pm
7
9
import statsmodels .api as sm
8
10
9
11
from numpy .testing import assert_allclose
12
+ from pytensor .graph .basic import explicit_graph_inputs
10
13
from statsmodels .tsa .statespace .dynamic_factor import DynamicFactor
11
14
12
15
from pymc_extras .statespace .models .DFM import BayesianDynamicFactor
13
- from pymc_extras .statespace .utils .constants import SHORT_NAME_TO_LONG
16
+ from pymc_extras .statespace .utils .constants import LONG_MATRIX_NAMES
14
17
from tests .statespace .shared_fixtures import rng
15
18
16
19
floatX = pytensor .config .floatX
@@ -27,95 +30,65 @@ def data():
27
30
return df
28
31
29
32
30
- @pytest .mark .parametrize ("k_factors" , [1 , 2 ])
31
- @pytest .mark .parametrize ("factor_order" , [0 , 1 , 2 ])
32
- @pytest .mark .parametrize ("error_order" , [0 , 1 , 2 ])
33
- def test_dfm_parameter_and_matrix_match (data , k_factors , factor_order , error_order ):
34
- # --- Statsmodels DFM ---
35
- sm_dfm = DynamicFactor (
36
- endog = data ,
37
- k_factors = k_factors ,
38
- factor_order = factor_order ,
39
- error_order = error_order ,
40
- )
41
-
42
- # Use deterministic small parameters for reproducibility
43
- param_array = np .full (len (sm_dfm .param_names ), 0.5 )
44
- sm_dfm .update (param_array )
45
-
46
- # Only request matrices that actually exist in ssm.__getitem__
47
- valid_names = ["design" , "obs_cov" , "transition" , "state_cov" , "selection" ]
48
- sm_matrices = {name : np .array (sm_dfm .ssm [name ]) for name in valid_names }
33
+ def create_sm_test_values_mapping (test_values , data , k_factors , factor_order , error_order ):
34
+ """Convert PyMC test values to statsmodels parameter format"""
35
+ sm_test_values = {}
49
36
50
- # --- PyMC DFM ---
51
- mod = BayesianDynamicFactor (
52
- k_factors = k_factors ,
53
- factor_order = factor_order ,
54
- k_endog = data . shape [ 1 ],
55
- error_order = error_order ,
56
- measurement_error = False ,
57
- verbose = False ,
37
+ # 1. Factor loadings: PyMC shape (n_endog, k_factors) -> statsmodels individual params
38
+ factor_loadings = test_values [ "factor_loadings" ]
39
+ all_pairs = product ( data . columns , range ( 1 , k_factors + 1 ))
40
+ sm_test_values . update (
41
+ {
42
+ f"loading.f { factor_idx } . { endog_name } " : value
43
+ for ( endog_name , factor_idx ), value in zip ( all_pairs , factor_loadings . ravel ())
44
+ }
58
45
)
59
46
60
- coords = mod .coords
61
- with pm .Model (coords = coords ):
62
- k_endog = data .shape [1 ]
63
- factor_part = max (1 , factor_order ) * k_factors
64
- error_part = error_order * k_endog if error_order > 0 else 0
65
- k_states = factor_part + error_part
66
-
67
- pm .Deterministic ("x0" , pt .constant (np .full ((k_states ,), 0.5 ), dtype = floatX ))
68
- pm .Deterministic ("P0" , pt .constant (np .full ((k_states , k_states ), 0.5 ), dtype = floatX ))
69
- pm .Deterministic (
70
- "factor_loadings" , pt .constant (np .full ((k_endog , k_factors ), 0.5 ), dtype = floatX )
47
+ # 2. Factor AR coefficients: PyMC shape (k_factors, factor_order*k_factors) -> L{lag}.f{to}.f{from}
48
+ if factor_order > 0 and "factor_ar" in test_values :
49
+ factor_ar = test_values ["factor_ar" ]
50
+ triplets = product (
51
+ range (1 , k_factors + 1 ), range (1 , factor_order + 1 ), range (1 , k_factors + 1 )
52
+ )
53
+ sm_test_values .update (
54
+ {
55
+ f"L{ lag } .f{ to_factor } .f{ from_factor } " : factor_ar [
56
+ from_factor - 1 , (lag - 1 ) * k_factors + (to_factor - 1 )
57
+ ]
58
+ for from_factor , lag , to_factor in triplets
59
+ }
71
60
)
72
61
73
- if factor_order > 0 :
74
- pm .Deterministic (
75
- "factor_ar" ,
76
- pt .constant (np .full ((k_factors , factor_order * k_factors ), 0.5 ), dtype = floatX ),
77
- )
78
- if error_order > 0 :
79
- pm .Deterministic (
80
- "error_ar" , pt .constant (np .full ((k_endog , error_order ), 0.5 ), dtype = floatX )
81
- )
82
- # pm.Deterministic("factor_sigma", pt.constant(np.full((k_factors,), 0.5), dtype=floatX))
83
- pm .Deterministic ("error_sigma" , pt .constant (np .full ((k_endog ,), 0.5 ), dtype = floatX ))
84
- pm .Deterministic ("sigma_obs" , pt .constant (np .full ((k_endog ,), 0.5 ), dtype = floatX ))
85
-
86
- mod ._insert_random_variables ()
87
-
88
- pymc_matrices = pm .draw (mod .subbed_ssm )
89
- pymc_matrices = dict (zip (SHORT_NAME_TO_LONG .values (), pymc_matrices ))
90
-
91
- # --- Compare ---
92
- for mat_name in valid_names :
93
- assert_allclose (
94
- pymc_matrices [mat_name ],
95
- sm_matrices [mat_name ],
96
- atol = 1e-12 ,
97
- err_msg = f"Matrix mismatch: { mat_name } (k_factors={ k_factors } , factor_order={ factor_order } , error_order={ error_order } )" ,
62
+ # 3. Error AR coefficients: PyMC shape (n_endog, error_order) -> L{lag}.e(var).e(var)
63
+ if error_order > 0 and "error_ar" in test_values :
64
+ error_ar = test_values ["error_ar" ]
65
+ pairs = product (enumerate (data .columns ), range (1 , error_order + 1 ))
66
+ sm_test_values .update (
67
+ {
68
+ f"L{ lag } .e({ endog_name } ).e({ endog_name } )" : error_ar [endog_idx , lag - 1 ]
69
+ for (endog_idx , endog_name ), lag in pairs
70
+ }
98
71
)
99
72
73
+ # 4. Observation error variances:
74
+ if "error_sigma" in test_values :
75
+ error_sigma = test_values ["error_sigma" ]
76
+ sm_test_values .update (
77
+ {
78
+ f"sigma2.{ endog_name } " : error_sigma [endog_idx ]
79
+ for endog_idx , endog_name in enumerate (data .columns )
80
+ }
81
+ )
82
+
83
+ return sm_test_values
84
+
100
85
101
86
@pytest .mark .parametrize ("k_factors" , [1 , 2 ])
102
87
@pytest .mark .parametrize ("factor_order" , [0 , 1 , 2 ])
103
- @pytest .mark .parametrize ("error_order" , [1 , 2 , 3 ])
88
+ @pytest .mark .parametrize ("error_order" , [0 , 1 , 2 ])
104
89
@pytest .mark .filterwarnings ("ignore::statsmodels.tools.sm_exceptions.EstimationWarning" )
105
90
@pytest .mark .filterwarnings ("ignore::FutureWarning" )
106
91
def test_DFM_update_matches_statsmodels (data , k_factors , factor_order , error_order , rng ):
107
- # --- Fit Statsmodels DynamicFactor with random small params ---
108
- sm_dfm = DynamicFactor (
109
- endog = data ,
110
- k_factors = k_factors ,
111
- factor_order = factor_order ,
112
- error_order = error_order ,
113
- )
114
- param_names = sm_dfm .param_names
115
- param_dict = {param : getattr (np , floatX )(rng .normal (scale = 0.1 ) ** 2 ) for param in param_names }
116
- sm_res = sm_dfm .fit_constrained (param_dict )
117
-
118
- # --- Setup BayesianDynamicFactor ---
119
92
mod = BayesianDynamicFactor (
120
93
k_factors = k_factors ,
121
94
factor_order = factor_order ,
@@ -124,96 +97,51 @@ def test_DFM_update_matches_statsmodels(data, k_factors, factor_order, error_ord
124
97
measurement_error = False ,
125
98
verbose = False ,
126
99
)
100
+ sm_dfm = DynamicFactor (
101
+ endog = data ,
102
+ k_factors = k_factors ,
103
+ factor_order = factor_order ,
104
+ error_order = error_order ,
105
+ )
106
+
107
+ # Generate test values for PyMC model
108
+ test_values = {}
109
+ test_values ["x0" ] = rng .normal (size = mod .k_states )
110
+ test_values ["P0" ] = np .eye (mod .k_states ) # Use identity for stability
111
+ test_values ["factor_loadings" ] = rng .normal (size = (data .shape [1 ], k_factors ))
112
+
113
+ if factor_order > 0 :
114
+ test_values ["factor_ar" ] = rng .normal (size = (k_factors , factor_order * k_factors ))
115
+
116
+ if error_order > 0 :
117
+ test_values ["error_ar" ] = rng .normal (size = (data .shape [1 ], error_order ))
127
118
128
- # Convert flat param dict to PyTensor variables as needed
129
- # Reshape factor_ar and error_ar parameters according to model expected shapes
130
- factor_ar_shape = (k_factors , factor_order * k_factors )
131
- error_ar_shape = (data .shape [1 ], error_order ) if error_order > 0 else (0 ,)
119
+ test_values ["error_sigma" ] = rng .beta (1 , 1 , size = data .shape [1 ])
132
120
133
- # Prepare parameter arrays to set as deterministic
134
- # Extract each group of parameters by name pattern (simplified)
135
- factor_loadings = np .array ([param_dict [p ] for p in param_names if "loading" in p ]).reshape (
136
- (data .shape [1 ], k_factors )
121
+ # Convert to statsmodels format
122
+ sm_test_values = create_sm_test_values_mapping (
123
+ test_values , data , k_factors , factor_order , error_order
137
124
)
138
125
139
- # Handle factor_ar parameters - need to account for different factor orders
140
- factor_ar_params = []
126
+ # Initialize and constrain statsmodels model
127
+ x0 = test_values ["x0" ]
128
+ P0 = test_values ["P0" ]
141
129
142
- for factor_idx in range (1 , k_factors + 1 ):
143
- for lag in range (1 , factor_order + 1 ):
144
- for factor_idx2 in range (1 , k_factors + 1 ):
145
- param_pattern = f"L{ lag } .f{ factor_idx2 } .f{ factor_idx } "
146
- if param_pattern in param_names :
147
- factor_ar_params .append (param_pattern )
130
+ sm_dfm .initialize_known (initial_state = x0 , initial_state_cov = P0 )
131
+ sm_dfm .fit_constrained ({name : sm_test_values [name ] for name in sm_dfm .param_names })
148
132
149
- if len (factor_ar_params ) > 0 :
150
- factor_ar_values = [param_dict [p ] for p in factor_ar_params ]
151
- factor_ar = np .array (factor_ar_values ).reshape (factor_ar_shape )
152
- else :
153
- factor_ar = np .zeros (factor_ar_shape )
133
+ # Get PyMC matrices using the same pattern as ETS test
134
+ matrices = mod ._unpack_statespace_with_placeholders ()
135
+ inputs = list (explicit_graph_inputs (matrices ))
136
+ input_names = [x .name for x in inputs ]
154
137
155
- # factor_sigma = np.array([param_dict[p] for p in param_names if "factor.sigma" in p])
138
+ f_matrices = pytensor .function (inputs , matrices )
139
+ test_values_subset = {name : test_values [name ] for name in input_names }
156
140
157
- # Handle error AR parameters - need to account for different error orders and variables
158
- if error_order > 0 :
159
- error_ar_params = []
160
- var_names = [col for col in data .columns ] # Get variable names from data
161
-
162
- # Order parameters by variable first, then by lag to match expected shape (n_vars, n_lags)
163
- for var_name in var_names :
164
- for lag in range (1 , error_order + 1 ):
165
- param_pattern = f"L{ lag } .e({ var_name } ).e({ var_name } )"
166
- if param_pattern in param_names :
167
- error_ar_params .append (param_pattern )
168
-
169
- if len (error_ar_params ) > 0 :
170
- error_ar_values = [param_dict [p ] for p in error_ar_params ]
171
- error_ar = np .array (error_ar_values ).reshape (error_ar_shape )
172
- else :
173
- error_ar = np .zeros (error_ar_shape )
174
-
175
- # Handle observation error variances - look for sigma2 pattern
176
- sigma_obs_params = [p for p in param_names if "sigma2." in p ]
177
- sigma_obs = np .array ([param_dict [p ] for p in sigma_obs_params ])
178
-
179
- # Handle error variances (if needed separately from sigma_obs)
180
- if error_order > 0 :
181
- error_sigma = sigma_obs # In this case, error_sigma is the same as sigma_obs
141
+ pymc_matrices = f_matrices (** test_values_subset )
182
142
183
- coords = mod .coords
184
- with pm .Model (coords = coords ) as model :
185
- k_states = k_factors * max (1 , factor_order ) + (
186
- error_order * data .shape [1 ] if error_order > 0 else 0
187
- )
188
- pm .Deterministic ("x0" , pt .zeros (k_states , dtype = floatX ))
189
- pm .Deterministic ("P0" , pt .eye (k_states , dtype = floatX ))
190
- # Set deterministic variables with constrained parameter values
191
- pm .Deterministic ("factor_loadings" , pt .as_tensor_variable (factor_loadings ))
192
- if factor_order > 0 :
193
- pm .Deterministic ("factor_ar" , pt .as_tensor_variable (factor_ar ))
194
- # pm.Deterministic("factor_sigma", pt.as_tensor_variable(factor_sigma))
195
- if error_order > 0 :
196
- pm .Deterministic ("error_ar" , pt .as_tensor_variable (error_ar ))
197
- pm .Deterministic ("error_sigma" , pt .as_tensor_variable (error_sigma ))
198
- pm .Deterministic ("sigma_obs" , pt .as_tensor_variable (sigma_obs ))
199
-
200
- mod ._insert_random_variables ()
201
-
202
- # Draw the substituted state-space matrices from PyMC model
203
- matrices = pm .draw (mod .subbed_ssm )
204
- matrix_dict = dict (zip (SHORT_NAME_TO_LONG .values (), matrices ))
205
-
206
- # Matrices to check
207
- matrices_to_check = ["transition" , "selection" , "state_cov" , "obs_cov" , "design" ]
208
-
209
- # Compare matrices from PyMC and Statsmodels
210
- for mat_name in matrices_to_check :
211
- sm_mat = np .array (sm_dfm .ssm [mat_name ])
212
- pm_mat = matrix_dict [mat_name ]
213
-
214
- assert_allclose (
215
- pm_mat ,
216
- sm_mat ,
217
- atol = 1e-10 ,
218
- err_msg = f"Matrix mismatch: { mat_name } (k_factors={ k_factors } , factor_order={ factor_order } , error_order={ error_order } )" ,
219
- )
143
+ sm_matrices = [sm_dfm .ssm [name ] for name in LONG_MATRIX_NAMES [2 :]]
144
+
145
+ # Compare matrices (skip x0 and P0)
146
+ for matrix , sm_matrix , name in zip (pymc_matrices [2 :], sm_matrices , LONG_MATRIX_NAMES [2 :]):
147
+ assert_allclose (matrix , sm_matrix , err_msg = f"{ name } does not match" )
0 commit comments