Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 238 additions & 57 deletions pymc_extras/statespace/models/VARMAX.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
ALL_STATE_AUX_DIM,
ALL_STATE_DIM,
AR_PARAM_DIM,
EXOGENOUS_DIM,
MA_PARAM_DIM,
OBS_STATE_AUX_DIM,
OBS_STATE_DIM,
SHOCK_AUX_DIM,
SHOCK_DIM,
TIME_DIM,
)

floatX = pytensor.config.floatX
Expand All @@ -28,60 +30,6 @@ class BayesianVARMAX(PyMCStateSpace):
r"""
Vector AutoRegressive Moving Average with eXogenous Regressors

Parameters
----------
order: tuple of (int, int)
Number of autoregressive (AR) and moving average (MA) terms to include in the model. All terms up to the
specified order are included. For restricted models, set zeros directly on the priors.

endog_names: list of str, optional
Names of the endogenous variables being modeled. Used to generate names for the state and shock coords. If
None, the state names will simply be numbered.

Exactly one of either ``endog_names`` or ``k_endog`` must be specified.

k_endog: int, optional
Number of endogenous states to be modeled.

Exactly one of either ``endog_names`` or ``k_endog`` must be specified.

stationary_initialization: bool, default False
If true, the initial state and initial state covariance will not be assigned priors. Instead, their steady
state values will be used. If False, the user is responsible for setting priors on the initial state and
initial covariance.

..warning :: This option is very sensitive to the priors placed on the AR and MA parameters. If the model dynamics
for a given sample are not stationary, sampling will fail with a "covariance is not positive semi-definite"
error.

filter_type: str, default "standard"
The type of Kalman Filter to use. Options are "standard", "single", "univariate", "steady_state",
and "cholesky". See the docs for kalman filters for more details.

state_structure: str, default "fast"
How to represent the state-space system. When "interpretable", each element of the state vector will have a
precise meaning as either lagged data, innovations, or lagged innovations. This comes at the cost of a larger
state vector, which may hurt performance.

When "fast", states are combined to minimize the dimension of the state vector, but lags and innovations are
mixed together as a result. Only the first state (the modeled timeseries) will have an obvious interpretation
in this case.

measurement_error: bool, default True
If true, a measurement error term is added to the model.

verbose: bool, default True
If true, a message will be logged to the terminal explaining the variable names, dimensions, and supports.

mode: str or Mode, optional
Pytensor compile mode, used in auxiliary sampling methods such as ``sample_conditional_posterior`` and
``forecast``. The mode does **not** effect calls to ``pm.sample``.

Regardless of whether a mode is specified, it can always be overwritten via the ``compile_kwargs`` argument
to all sampling methods.

Notes
-----
The VARMA model is a multivariate extension of the SARIMAX model. Given a set of timeseries :math:`\{x_t\}_{t=0}^T`,
with :math:`x_t = \begin{bmatrix} x_{1,t} & x_{2,t} & \cdots & x_{k,t} \end{bmatrix}^T`, a VARMA models each series
as a function of the histories of all series. Specifically, denoting the AR-MA order as (p, q), a VARMA can be
Expand Down Expand Up @@ -152,23 +100,133 @@ def __init__(
order: tuple[int, int],
endog_names: list[str] | None = None,
k_endog: int | None = None,
exog_state_names: list[str] | dict[str, list[str]] | None = None,
k_exog: int | dict[str, int] | None = None,
stationary_initialization: bool = False,
filter_type: str = "standard",
measurement_error: bool = False,
verbose: bool = True,
mode: str | Mode | None = None,
):
"""
Create a Bayesian VARMAX model.

Parameters
----------
order: tuple of (int, int)
Number of autoregressive (AR) and moving average (MA) terms to include in the model. All terms up to the
specified order are included. For restricted models, set zeros directly on the priors.

endog_names: list of str, optional
Names of the endogenous variables being modeled. Used to generate names for the state and shock coords. If
None, the state names will simply be numbered.

Exactly one of either ``endog_names`` or ``k_endog`` must be specified.

exog_state_names : list[str] or dict[str, list[str]], optional
Names of the exogenous state variables. If a list, all endogenous variables will share the same exogenous
variables. If a dict, keys should be the names of the endogenous variables, and values should be lists of the
exogenous variable names for that endogenous variable. Endogenous variables not included in the dict will
be assumed to have no exogenous variables. If None, no exogenous variables will be included.

k_exog : int or dict[str, int], optional
Number of exogenous variables. If an int, all endogenous variables will share the same number of exogenous
variables. If a dict, keys should be the names of the endogenous variables, and values should be the number of
exogenous variables for that endogenous variable. Endogenous variables not included in the dict will be
assumed to have no exogenous variables. If None, no exogenous variables will be included.

stationary_initialization: bool, default False
If true, the initial state and initial state covariance will not be assigned priors. Instead, their steady
state values will be used. If False, the user is responsible for setting priors on the initial state and
initial covariance.

..warning :: This option is very sensitive to the priors placed on the AR and MA parameters. If the model dynamics
for a given sample are not stationary, sampling will fail with a "covariance is not positive semi-definite"
error.

filter_type: str, default "standard"
The type of Kalman Filter to use. Options are "standard", "single", "univariate", "steady_state",
and "cholesky". See the docs for kalman filters for more details.

state_structure: str, default "fast"
How to represent the state-space system. When "interpretable", each element of the state vector will have a
precise meaning as either lagged data, innovations, or lagged innovations. This comes at the cost of a larger
state vector, which may hurt performance.

When "fast", states are combined to minimize the dimension of the state vector, but lags and innovations are
mixed together as a result. Only the first state (the modeled timeseries) will have an obvious interpretation
in this case.

measurement_error: bool, default True
If true, a measurement error term is added to the model.

verbose: bool, default True
If true, a message will be logged to the terminal explaining the variable names, dimensions, and supports.

mode: str or Mode, optional
Pytensor compile mode, used in auxiliary sampling methods such as ``sample_conditional_posterior`` and
``forecast``. The mode does **not** effect calls to ``pm.sample``.

Regardless of whether a mode is specified, it can always be overwritten via the ``compile_kwargs`` argument
to all sampling methods.

"""
if (endog_names is None) and (k_endog is None):
raise ValueError("Must specify either endog_names or k_endog")
if (endog_names is not None) and (k_endog is None):
k_endog = len(endog_names)
if (endog_names is None) and (k_endog is not None):
endog_names = [f"state.{i + 1}" for i in range(k_endog)]
endog_names = [f"observed_{i}" for i in range(k_endog)]
if (endog_names is not None) and (k_endog is not None):
if len(endog_names) != k_endog:
raise ValueError("Length of provided endog_names does not match provided k_endog")

if k_exog is not None and not isinstance(k_endog, int | dict):
raise ValueError("If not None, k_endog must be either an int or a dict")
if exog_state_names is not None and not isinstance(exog_state_names, list | dict):
raise ValueError("If not None, exog_state_names must be either a list or a dict")

if k_exog is not None and exog_state_names is not None:
if isinstance(k_exog, int) and isinstance(exog_state_names, list):
if len(exog_state_names) != k_exog:
raise ValueError("Length of exog_state_names does not match provided k_exog")
elif isinstance(k_exog, int) and isinstance(exog_state_names, dict):
raise ValueError(
"If k_exog is an int, exog_state_names must be a list of the same length (or None)"
)
elif isinstance(k_exog, dict) and isinstance(exog_state_names, list):
raise ValueError(
"If k_exog is a dict, exog_state_names must be a dict as well (or None)"
)
elif isinstance(k_exog, dict) and isinstance(exog_state_names, dict):
if set(k_exog.keys()) != set(exog_state_names.keys()):
raise ValueError("Keys of k_exog and exog_state_names dicts must match")
if not all(
len(names) == k for names, k in zip(exog_state_names.values(), k_exog.values())
):
raise ValueError(
"If both k_endog and exog_state_names are provided, lengths of exog_state_names "
"lists must match corresponding values in k_exog"
)

if k_exog is not None and exog_state_names is None:
if isinstance(k_exog, int):
exog_state_names = [f"exogenous_{i}" for i in range(k_exog)]
elif isinstance(k_exog, dict):
exog_state_names = {
name: [f"{name}_exogenous_{i}" for i in range(k)] for name, k in k_exog.items()
}

if k_exog is None and exog_state_names is not None:
if isinstance(exog_state_names, list):
k_exog = len(exog_state_names)
elif isinstance(exog_state_names, dict):
k_exog = {name: len(names) for name, names in exog_state_names.items()}

self.endog_names = list(endog_names)
self.exog_state_names = exog_state_names

self.k_exog = k_exog
self.p, self.q = order
self.stationary_initialization = stationary_initialization

Expand Down Expand Up @@ -208,6 +266,14 @@ def param_names(self):
names.remove("ar_params")
if self.q == 0:
names.remove("ma_params")

# Add exogenous regression coefficents rather than remove, since we might have to handle
# several (if self.exog_state_names is a dict)
if isinstance(self.exog_state_names, list):
names.append("beta_exog")
elif isinstance(self.exog_state_names, dict):
names.extend([f"beta_{name}" for name in self.exog_state_names.keys()])

return names

@property
Expand Down Expand Up @@ -239,19 +305,65 @@ def param_info(self) -> dict[str, dict[str, Any]]:
},
}

if isinstance(self.exog_state_names, list):
k_exog = len(self.exog_state_names)
info["beta_exog"] = {
"shape": (self.k_endog, k_exog),
"constraints": "None",
}

elif isinstance(self.exog_state_names, dict):
for name, exog_names in self.exog_state_names.items():
k_exog = len(exog_names)
info[f"beta_{name}"] = {
"shape": (k_exog,),
"constraints": "None",
}

for name in self.param_names:
info[name]["dims"] = self.param_dims[name]

return {name: info[name] for name in self.param_names}

@property
def data_info(self) -> dict[str, dict[str, Any]]:
info = None

if isinstance(self.exog_state_names, list):
info = {
"exogenous_data": {
"dims": (TIME_DIM, EXOGENOUS_DIM),
"shape": (None, self.k_exog),
}
}

elif isinstance(self.exog_state_names, dict):
info = {
f"{endog_state}_exogenous_data": {
"dims": (TIME_DIM, f"{EXOGENOUS_DIM}_{endog_state}"),
"shape": (None, len(exog_names)),
}
for endog_state, exog_names in self.exog_state_names.items()
}

return info

@property
def data_names(self) -> list[str]:
if isinstance(self.exog_state_names, list):
return ["exogenous_data"]
elif isinstance(self.exog_state_names, dict):
return [f"{endog_state}_exogenous_data" for endog_state in self.exog_state_names.keys()]
return []

@property
def state_names(self):
state_names = self.endog_names.copy()
state_names += [
f"L{i + 1}.{state}" for i in range(self.p - 1) for state in self.endog_names
f"L{i + 1}_{state}" for i in range(self.p - 1) for state in self.endog_names
]
state_names += [
f"L{i + 1}.{state}_innov" for i in range(self.q) for state in self.endog_names
f"L{i + 1}_{state}_innov" for i in range(self.q) for state in self.endog_names
]

return state_names
Expand All @@ -276,6 +388,12 @@ def coords(self) -> dict[str, Sequence]:
if self.q > 0:
coords.update({MA_PARAM_DIM: list(range(1, self.q + 1))})

if isinstance(self.exog_state_names, list):
coords[EXOGENOUS_DIM] = self.exog_state_names
elif isinstance(self.exog_state_names, dict):
for name, exog_names in self.exog_state_names.items():
coords[f"{EXOGENOUS_DIM}_{name}"] = exog_names

return coords

@property
Expand All @@ -299,6 +417,14 @@ def param_dims(self):
del coord_map["P0"]
del coord_map["x0"]

if isinstance(self.exog_state_names, list):
coord_map["beta_exog"] = (OBS_STATE_DIM, EXOGENOUS_DIM)
elif isinstance(self.exog_state_names, dict):
# If each state has its own exogenous variables, each parameter needs it own dim, since we expect the
# dim labels to all be different (otherwise we'd be in the list case).
for name in self.exog_state_names.keys():
coord_map[f"beta_{name}"] = (f"{EXOGENOUS_DIM}_{name}",)

return coord_map

def add_default_priors(self):
Expand Down Expand Up @@ -386,6 +512,61 @@ def make_symbolic_graph(self) -> None:
)
self.ssm["state_cov", :, :] = state_cov

if self.exog_state_names is not None:
if isinstance(self.exog_state_names, list):
beta_exog = self.make_and_register_variable(
"beta_exog", shape=(self.k_posdef, self.k_exog), dtype=floatX
)
exog_data = self.make_and_register_data(
"exogenous_data", shape=(None, self.k_exog), dtype=floatX
)

obs_intercept = exog_data @ beta_exog.T

elif isinstance(self.exog_state_names, dict):
obs_components = []
for i, name in enumerate(self.endog_names):
if name in self.exog_state_names:
k_exog = len(self.exog_state_names[name])
beta_exog = self.make_and_register_variable(
f"beta_{name}", shape=(k_exog,), dtype=floatX
)
exog_data = self.make_and_register_data(
f"{name}_exogenous_data", shape=(None, k_exog), dtype=floatX
)
obs_components.append(pt.expand_dims(exog_data @ beta_exog, axis=-1))
else:
obs_components.append(pt.zeros((1, 1), dtype=floatX))

# TODO: Replace all of this with pt.concat_with_broadcast once PyMC works with pytensor >= 2.32

# If there were any zeros, they need to be broadcast against the non-zeros.
# Core shape is the last dim, the time dim is always broadcast
non_concat_shape = [1, None]

# Look for the first non-zero component to get the shape from
for tensor_inp in obs_components:
for i, (bcast, sh) in enumerate(
zip(tensor_inp.type.broadcastable, tensor_inp.shape)
):
if bcast or i == 1:
continue
non_concat_shape[i] = sh

assert non_concat_shape.count(None) == 1

bcast_tensor_inputs = []
for tensor_inp in obs_components:
non_concat_shape[1] = tensor_inp.shape[1]
bcast_tensor_inputs.append(pt.broadcast_to(tensor_inp, non_concat_shape))

obs_intercept = pt.join(1, *bcast_tensor_inputs)

else:
raise NotImplementedError()

self.ssm["obs_intercept"] = obs_intercept

if self.stationary_initialization:
# Solve for matrix quadratic for P0
T = self.ssm["transition"]
Expand Down
Loading
Loading