1
1
import numpy as np
2
-
3
- from scipy import linalg
2
+ import pytensor .tensor as pt
4
3
5
4
from pymc_extras .statespace .models .structural .core import Component
6
5
from pymc_extras .statespace .models .structural .utils import order_to_mask
@@ -13,7 +12,6 @@ class LevelTrendComponent(Component):
13
12
14
13
Parameters
15
14
----------
16
- __________
17
15
order : int
18
16
19
17
Number of time derivatives of the trend to include in the model. For example, when order=3, the trend will
@@ -114,7 +112,7 @@ def __init__(
114
112
self ,
115
113
order : int | list [int ] = 2 ,
116
114
innovations_order : int | list [int ] | None = None ,
117
- name : str = "LevelTrend " ,
115
+ name : str = "level_trend " ,
118
116
observed_state_names : list [str ] | None = None ,
119
117
):
120
118
if innovations_order is None :
@@ -166,37 +164,44 @@ def populate_component_properties(self):
166
164
k_posdef = self .k_posdef // k_endog
167
165
168
166
name_slice = POSITION_DERIVATIVE_NAMES [:k_states ]
169
- self .param_names = [f"initial_ { self .name } " ]
167
+ self .param_names = [f"{ self .name } _initial " ]
170
168
base_names = [name for name , mask in zip (name_slice , self ._order_mask ) if mask ]
171
169
self .state_names = [
172
170
f"{ name } [{ obs_name } ]" for obs_name in self .observed_state_names for name in base_names
173
171
]
174
- self .param_dims = {f"initial_ { self .name } " : (f"{ self .name } _state" ,)}
172
+ self .param_dims = {f"{ self .name } _initial " : (f"{ self .name } _state" ,)}
175
173
self .coords = {f"{ self .name } _state" : base_names }
176
174
177
175
if k_endog > 1 :
178
176
self .param_dims [f"{ self .name } _state" ] = (
179
177
f"{ self .name } _endog" ,
180
178
f"{ self .name } _state" ,
181
179
)
182
- self .param_dims = {f"initial_ { self .name } " : (f"{ self .name } _endog" , f"{ self .name } _state" )}
180
+ self .param_dims = {f"{ self .name } _initial " : (f"{ self .name } _endog" , f"{ self .name } _state" )}
183
181
self .coords [f"{ self .name } _endog" ] = self .observed_state_names
184
182
185
183
shape = (k_endog , k_states ) if k_endog > 1 else (k_states ,)
186
- self .param_info = {f"initial_ { self .name } " : {"shape" : shape , "constraints" : None }}
184
+ self .param_info = {f"{ self .name } _initial " : {"shape" : shape , "constraints" : None }}
187
185
188
186
if self .k_posdef > 0 :
189
- self .param_names += [f"sigma_{ self .name } " ]
190
- self .shock_names = [
187
+ self .param_names += [f"{ self .name } _sigma" ]
188
+
189
+ shock_base_names = [
191
190
name for name , mask in zip (name_slice , self .innovations_order ) if mask
192
191
]
193
- self .param_dims [f"sigma_{ self .name } " ] = (
192
+ self .shock_names = [
193
+ f"{ name } [{ obs_name } ]"
194
+ for obs_name in self .observed_state_names
195
+ for name in shock_base_names
196
+ ]
197
+
198
+ self .param_dims [f"{ self .name } _sigma" ] = (
194
199
(f"{ self .name } _shock" ,)
195
200
if k_endog == 1
196
201
else (f"{ self .name } _endog" , f"{ self .name } _shock" )
197
202
)
198
203
self .coords [f"{ self .name } _shock" ] = self .shock_names
199
- self .param_info [f"sigma_ { self .name } " ] = {
204
+ self .param_info [f"{ self .name } _sigma " ] = {
200
205
"shape" : (k_posdef ,) if k_endog == 1 else (k_endog , k_posdef ),
201
206
"constraints" : "Positive" ,
202
207
}
@@ -210,28 +215,34 @@ def make_symbolic_graph(self) -> None:
210
215
k_posdef = self .k_posdef // k_endog
211
216
212
217
initial_trend = self .make_and_register_variable (
213
- f"initial_ { self .name } " ,
218
+ f"{ self .name } _initial " ,
214
219
shape = (k_states ,) if k_endog == 1 else (k_endog , k_states ),
215
220
)
216
221
self .ssm ["initial_state" , :] = initial_trend .ravel ()
217
222
218
- triu_idx = np .triu_indices (k_states )
219
- T = np .zeros ((k_states , k_states ))
220
- T [triu_idx [0 ], triu_idx [1 ]] = 1
223
+ triu_idx = pt .triu_indices (k_states )
224
+ T = pt .zeros ((k_states , k_states ))[triu_idx [0 ], triu_idx [1 ]].set (1 )
221
225
222
- self .ssm ["transition" ] = linalg .block_diag (* [T for _ in range (k_endog )])
226
+ self .ssm ["transition" , :, :] = pt .specify_shape (
227
+ pt .linalg .block_diag (* [T for _ in range (k_endog )]), (self .k_states , self .k_states )
228
+ )
223
229
224
230
R = np .eye (k_states )
225
231
R = R [:, self .innovations_order ]
226
232
227
- self .ssm ["selection" , :, :] = linalg .block_diag (* [R for _ in range (k_endog )])
233
+ self .ssm ["selection" , :, :] = pt .specify_shape (
234
+ pt .linalg .block_diag (* [R for _ in range (k_endog )]), (self .k_states , self .k_posdef )
235
+ )
228
236
229
237
Z = np .array ([1.0 ] + [0.0 ] * (k_states - 1 )).reshape ((1 , - 1 ))
230
- self .ssm ["design" ] = linalg .block_diag (* [Z for _ in range (k_endog )])
238
+
239
+ self .ssm ["design" , :, :] = pt .specify_shape (
240
+ pt .linalg .block_diag (* [Z for _ in range (k_endog )]), (self .k_endog , self .k_states )
241
+ )
231
242
232
243
if k_posdef > 0 :
233
244
sigma_trend = self .make_and_register_variable (
234
- f"sigma_ { self .name } " ,
245
+ f"{ self .name } _sigma " ,
235
246
shape = (k_posdef ,) if k_endog == 1 else (k_endog , k_posdef ),
236
247
)
237
248
diag_idx = np .diag_indices (k_posdef * k_endog )
0 commit comments