1+ from itertools import product
2+
13import numpy as np
24import pandas as pd
35import pymc as pm
79import statsmodels .api as sm
810
911from numpy .testing import assert_allclose
12+ from pytensor .graph .basic import explicit_graph_inputs
1013from statsmodels .tsa .statespace .dynamic_factor import DynamicFactor
1114
1215from pymc_extras .statespace .models .DFM import BayesianDynamicFactor
13- from pymc_extras .statespace .utils .constants import SHORT_NAME_TO_LONG
16+ from pymc_extras .statespace .utils .constants import LONG_MATRIX_NAMES
1417from tests .statespace .shared_fixtures import rng
1518
1619floatX = pytensor .config .floatX
@@ -27,95 +30,65 @@ def data():
2730 return df
2831
2932
30- @pytest .mark .parametrize ("k_factors" , [1 , 2 ])
31- @pytest .mark .parametrize ("factor_order" , [0 , 1 , 2 ])
32- @pytest .mark .parametrize ("error_order" , [0 , 1 , 2 ])
33- def test_dfm_parameter_and_matrix_match (data , k_factors , factor_order , error_order ):
34- # --- Statsmodels DFM ---
35- sm_dfm = DynamicFactor (
36- endog = data ,
37- k_factors = k_factors ,
38- factor_order = factor_order ,
39- error_order = error_order ,
40- )
41-
42- # Use deterministic small parameters for reproducibility
43- param_array = np .full (len (sm_dfm .param_names ), 0.5 )
44- sm_dfm .update (param_array )
45-
46- # Only request matrices that actually exist in ssm.__getitem__
47- valid_names = ["design" , "obs_cov" , "transition" , "state_cov" , "selection" ]
48- sm_matrices = {name : np .array (sm_dfm .ssm [name ]) for name in valid_names }
33+ def create_sm_test_values_mapping (test_values , data , k_factors , factor_order , error_order ):
34+ """Convert PyMC test values to statsmodels parameter format"""
35+ sm_test_values = {}
4936
50- # --- PyMC DFM ---
51- mod = BayesianDynamicFactor (
52- k_factors = k_factors ,
53- factor_order = factor_order ,
54- k_endog = data . shape [ 1 ],
55- error_order = error_order ,
56- measurement_error = False ,
57- verbose = False ,
37+ # 1. Factor loadings: PyMC shape (n_endog, k_factors) -> statsmodels individual params
38+ factor_loadings = test_values [ "factor_loadings" ]
39+ all_pairs = product ( data . columns , range ( 1 , k_factors + 1 ))
40+ sm_test_values . update (
41+ {
42+ f"loading.f { factor_idx } . { endog_name } " : value
43+ for ( endog_name , factor_idx ), value in zip ( all_pairs , factor_loadings . ravel ())
44+ }
5845 )
5946
60- coords = mod .coords
61- with pm .Model (coords = coords ):
62- k_endog = data .shape [1 ]
63- factor_part = max (1 , factor_order ) * k_factors
64- error_part = error_order * k_endog if error_order > 0 else 0
65- k_states = factor_part + error_part
66-
67- pm .Deterministic ("x0" , pt .constant (np .full ((k_states ,), 0.5 ), dtype = floatX ))
68- pm .Deterministic ("P0" , pt .constant (np .full ((k_states , k_states ), 0.5 ), dtype = floatX ))
69- pm .Deterministic (
70- "factor_loadings" , pt .constant (np .full ((k_endog , k_factors ), 0.5 ), dtype = floatX )
47+ # 2. Factor AR coefficients: PyMC shape (k_factors, factor_order*k_factors) -> L{lag}.f{to}.f{from}
48+ if factor_order > 0 and "factor_ar" in test_values :
49+ factor_ar = test_values ["factor_ar" ]
50+ triplets = product (
51+ range (1 , k_factors + 1 ), range (1 , factor_order + 1 ), range (1 , k_factors + 1 )
52+ )
53+ sm_test_values .update (
54+ {
55+ f"L{ lag } .f{ to_factor } .f{ from_factor } " : factor_ar [
56+ from_factor - 1 , (lag - 1 ) * k_factors + (to_factor - 1 )
57+ ]
58+ for from_factor , lag , to_factor in triplets
59+ }
7160 )
7261
73- if factor_order > 0 :
74- pm .Deterministic (
75- "factor_ar" ,
76- pt .constant (np .full ((k_factors , factor_order * k_factors ), 0.5 ), dtype = floatX ),
77- )
78- if error_order > 0 :
79- pm .Deterministic (
80- "error_ar" , pt .constant (np .full ((k_endog , error_order ), 0.5 ), dtype = floatX )
81- )
82- # pm.Deterministic("factor_sigma", pt.constant(np.full((k_factors,), 0.5), dtype=floatX))
83- pm .Deterministic ("error_sigma" , pt .constant (np .full ((k_endog ,), 0.5 ), dtype = floatX ))
84- pm .Deterministic ("sigma_obs" , pt .constant (np .full ((k_endog ,), 0.5 ), dtype = floatX ))
85-
86- mod ._insert_random_variables ()
87-
88- pymc_matrices = pm .draw (mod .subbed_ssm )
89- pymc_matrices = dict (zip (SHORT_NAME_TO_LONG .values (), pymc_matrices ))
90-
91- # --- Compare ---
92- for mat_name in valid_names :
93- assert_allclose (
94- pymc_matrices [mat_name ],
95- sm_matrices [mat_name ],
96- atol = 1e-12 ,
97- err_msg = f"Matrix mismatch: { mat_name } (k_factors={ k_factors } , factor_order={ factor_order } , error_order={ error_order } )" ,
62+ # 3. Error AR coefficients: PyMC shape (n_endog, error_order) -> L{lag}.e(var).e(var)
63+ if error_order > 0 and "error_ar" in test_values :
64+ error_ar = test_values ["error_ar" ]
65+ pairs = product (enumerate (data .columns ), range (1 , error_order + 1 ))
66+ sm_test_values .update (
67+ {
68+ f"L{ lag } .e({ endog_name } ).e({ endog_name } )" : error_ar [endog_idx , lag - 1 ]
69+ for (endog_idx , endog_name ), lag in pairs
70+ }
9871 )
9972
73+ # 4. Observation error variances:
74+ if "error_sigma" in test_values :
75+ error_sigma = test_values ["error_sigma" ]
76+ sm_test_values .update (
77+ {
78+ f"sigma2.{ endog_name } " : error_sigma [endog_idx ]
79+ for endog_idx , endog_name in enumerate (data .columns )
80+ }
81+ )
82+
83+ return sm_test_values
84+
10085
10186@pytest .mark .parametrize ("k_factors" , [1 , 2 ])
10287@pytest .mark .parametrize ("factor_order" , [0 , 1 , 2 ])
103- @pytest .mark .parametrize ("error_order" , [1 , 2 , 3 ])
88+ @pytest .mark .parametrize ("error_order" , [0 , 1 , 2 ])
10489@pytest .mark .filterwarnings ("ignore::statsmodels.tools.sm_exceptions.EstimationWarning" )
10590@pytest .mark .filterwarnings ("ignore::FutureWarning" )
10691def test_DFM_update_matches_statsmodels (data , k_factors , factor_order , error_order , rng ):
107- # --- Fit Statsmodels DynamicFactor with random small params ---
108- sm_dfm = DynamicFactor (
109- endog = data ,
110- k_factors = k_factors ,
111- factor_order = factor_order ,
112- error_order = error_order ,
113- )
114- param_names = sm_dfm .param_names
115- param_dict = {param : getattr (np , floatX )(rng .normal (scale = 0.1 ) ** 2 ) for param in param_names }
116- sm_res = sm_dfm .fit_constrained (param_dict )
117-
118- # --- Setup BayesianDynamicFactor ---
11992 mod = BayesianDynamicFactor (
12093 k_factors = k_factors ,
12194 factor_order = factor_order ,
@@ -124,96 +97,51 @@ def test_DFM_update_matches_statsmodels(data, k_factors, factor_order, error_ord
12497 measurement_error = False ,
12598 verbose = False ,
12699 )
100+ sm_dfm = DynamicFactor (
101+ endog = data ,
102+ k_factors = k_factors ,
103+ factor_order = factor_order ,
104+ error_order = error_order ,
105+ )
106+
107+ # Generate test values for PyMC model
108+ test_values = {}
109+ test_values ["x0" ] = rng .normal (size = mod .k_states )
110+ test_values ["P0" ] = np .eye (mod .k_states ) # Use identity for stability
111+ test_values ["factor_loadings" ] = rng .normal (size = (data .shape [1 ], k_factors ))
112+
113+ if factor_order > 0 :
114+ test_values ["factor_ar" ] = rng .normal (size = (k_factors , factor_order * k_factors ))
115+
116+ if error_order > 0 :
117+ test_values ["error_ar" ] = rng .normal (size = (data .shape [1 ], error_order ))
127118
128- # Convert flat param dict to PyTensor variables as needed
129- # Reshape factor_ar and error_ar parameters according to model expected shapes
130- factor_ar_shape = (k_factors , factor_order * k_factors )
131- error_ar_shape = (data .shape [1 ], error_order ) if error_order > 0 else (0 ,)
119+ test_values ["error_sigma" ] = rng .beta (1 , 1 , size = data .shape [1 ])
132120
133- # Prepare parameter arrays to set as deterministic
134- # Extract each group of parameters by name pattern (simplified)
135- factor_loadings = np .array ([param_dict [p ] for p in param_names if "loading" in p ]).reshape (
136- (data .shape [1 ], k_factors )
121+ # Convert to statsmodels format
122+ sm_test_values = create_sm_test_values_mapping (
123+ test_values , data , k_factors , factor_order , error_order
137124 )
138125
139- # Handle factor_ar parameters - need to account for different factor orders
140- factor_ar_params = []
126+ # Initialize and constrain statsmodels model
127+ x0 = test_values ["x0" ]
128+ P0 = test_values ["P0" ]
141129
142- for factor_idx in range (1 , k_factors + 1 ):
143- for lag in range (1 , factor_order + 1 ):
144- for factor_idx2 in range (1 , k_factors + 1 ):
145- param_pattern = f"L{ lag } .f{ factor_idx2 } .f{ factor_idx } "
146- if param_pattern in param_names :
147- factor_ar_params .append (param_pattern )
130+ sm_dfm .initialize_known (initial_state = x0 , initial_state_cov = P0 )
131+ sm_dfm .fit_constrained ({name : sm_test_values [name ] for name in sm_dfm .param_names })
148132
149- if len (factor_ar_params ) > 0 :
150- factor_ar_values = [param_dict [p ] for p in factor_ar_params ]
151- factor_ar = np .array (factor_ar_values ).reshape (factor_ar_shape )
152- else :
153- factor_ar = np .zeros (factor_ar_shape )
133+ # Get PyMC matrices using the same pattern as ETS test
134+ matrices = mod ._unpack_statespace_with_placeholders ()
135+ inputs = list (explicit_graph_inputs (matrices ))
136+ input_names = [x .name for x in inputs ]
154137
155- # factor_sigma = np.array([param_dict[p] for p in param_names if "factor.sigma" in p])
138+ f_matrices = pytensor .function (inputs , matrices )
139+ test_values_subset = {name : test_values [name ] for name in input_names }
156140
157- # Handle error AR parameters - need to account for different error orders and variables
158- if error_order > 0 :
159- error_ar_params = []
160- var_names = [col for col in data .columns ] # Get variable names from data
161-
162- # Order parameters by variable first, then by lag to match expected shape (n_vars, n_lags)
163- for var_name in var_names :
164- for lag in range (1 , error_order + 1 ):
165- param_pattern = f"L{ lag } .e({ var_name } ).e({ var_name } )"
166- if param_pattern in param_names :
167- error_ar_params .append (param_pattern )
168-
169- if len (error_ar_params ) > 0 :
170- error_ar_values = [param_dict [p ] for p in error_ar_params ]
171- error_ar = np .array (error_ar_values ).reshape (error_ar_shape )
172- else :
173- error_ar = np .zeros (error_ar_shape )
174-
175- # Handle observation error variances - look for sigma2 pattern
176- sigma_obs_params = [p for p in param_names if "sigma2." in p ]
177- sigma_obs = np .array ([param_dict [p ] for p in sigma_obs_params ])
178-
179- # Handle error variances (if needed separately from sigma_obs)
180- if error_order > 0 :
181- error_sigma = sigma_obs # In this case, error_sigma is the same as sigma_obs
141+ pymc_matrices = f_matrices (** test_values_subset )
182142
183- coords = mod .coords
184- with pm .Model (coords = coords ) as model :
185- k_states = k_factors * max (1 , factor_order ) + (
186- error_order * data .shape [1 ] if error_order > 0 else 0
187- )
188- pm .Deterministic ("x0" , pt .zeros (k_states , dtype = floatX ))
189- pm .Deterministic ("P0" , pt .eye (k_states , dtype = floatX ))
190- # Set deterministic variables with constrained parameter values
191- pm .Deterministic ("factor_loadings" , pt .as_tensor_variable (factor_loadings ))
192- if factor_order > 0 :
193- pm .Deterministic ("factor_ar" , pt .as_tensor_variable (factor_ar ))
194- # pm.Deterministic("factor_sigma", pt.as_tensor_variable(factor_sigma))
195- if error_order > 0 :
196- pm .Deterministic ("error_ar" , pt .as_tensor_variable (error_ar ))
197- pm .Deterministic ("error_sigma" , pt .as_tensor_variable (error_sigma ))
198- pm .Deterministic ("sigma_obs" , pt .as_tensor_variable (sigma_obs ))
199-
200- mod ._insert_random_variables ()
201-
202- # Draw the substituted state-space matrices from PyMC model
203- matrices = pm .draw (mod .subbed_ssm )
204- matrix_dict = dict (zip (SHORT_NAME_TO_LONG .values (), matrices ))
205-
206- # Matrices to check
207- matrices_to_check = ["transition" , "selection" , "state_cov" , "obs_cov" , "design" ]
208-
209- # Compare matrices from PyMC and Statsmodels
210- for mat_name in matrices_to_check :
211- sm_mat = np .array (sm_dfm .ssm [mat_name ])
212- pm_mat = matrix_dict [mat_name ]
213-
214- assert_allclose (
215- pm_mat ,
216- sm_mat ,
217- atol = 1e-10 ,
218- err_msg = f"Matrix mismatch: { mat_name } (k_factors={ k_factors } , factor_order={ factor_order } , error_order={ error_order } )" ,
219- )
143+ sm_matrices = [sm_dfm .ssm [name ] for name in LONG_MATRIX_NAMES [2 :]]
144+
145+ # Compare matrices (skip x0 and P0)
146+ for matrix , sm_matrix , name in zip (pymc_matrices [2 :], sm_matrices , LONG_MATRIX_NAMES [2 :]):
147+ assert_allclose (matrix , sm_matrix , err_msg = f"{ name } does not match" )
0 commit comments