From 29b6ec084fb49a8adb2bf5a7b6ebbd8540250e39 Mon Sep 17 00:00:00 2001 From: rickvandervliet <38972062+rickvandervliet@users.noreply.github.com> Date: Tue, 4 Nov 2025 16:40:35 +0100 Subject: [PATCH] Restore edits --- pymc_extras/statespace/core/statespace.py | 434 ++++++++++++++++----- pymc_extras/statespace/utils/data_tools.py | 73 +++- 2 files changed, 410 insertions(+), 97 deletions(-) diff --git a/pymc_extras/statespace/core/statespace.py b/pymc_extras/statespace/core/statespace.py index 266e92f74..87f119067 100644 --- a/pymc_extras/statespace/core/statespace.py +++ b/pymc_extras/statespace/core/statespace.py @@ -66,7 +66,9 @@ def _validate_filter_arg(filter_arg): def _verify_group(group): if group not in ["prior", "posterior"]: - raise ValueError(f'Argument "group" must be one of "prior" or "posterior", found {group}') + raise ValueError( + f'Argument "group" must be one of "prior" or "posterior", found {group}' + ) class PyMCStateSpace: @@ -106,6 +108,11 @@ class PyMCStateSpace: Regardless of whether a mode is specified, it can always be overwritten via the ``compile_kwargs`` argument to all sampling methods. + name : str, optional + A name for this state space model instance. If provided, all variables registered in the PyMC model + will be prefixed with this name (e.g., "model1_x0", "model1_P0"). This allows multiple state space + models to coexist in a single PyMC model without variable name conflicts. Default is None. + Notes ----- Based on the statsmodels statespace implementation https://github.com/statsmodels/statsmodels/blob/main/statsmodels/tsa/statespace/representation.py, @@ -229,6 +236,7 @@ def __init__( verbose: bool = True, measurement_error: bool = False, mode: str | None = None, + name: str | None = None, ): self._fit_coords: dict[str, Sequence[str]] | None = None self._fit_dims: dict[str, Sequence[str]] | None = None @@ -244,6 +252,7 @@ def __init__( self.k_posdef = k_posdef self.measurement_error = measurement_error self.mode = mode + self.name = name # All models contain a state space representation and a Kalman filter self.ssm = PytensorRepresentation(k_endog, k_states, k_posdef) @@ -253,11 +262,14 @@ def __init__( if filter_type.lower() not in FILTER_FACTORY.keys(): raise NotImplementedError( - "The following are valid filter types: " + ", ".join(list(FILTER_FACTORY.keys())) + "The following are valid filter types: " + + ", ".join(list(FILTER_FACTORY.keys())) ) if filter_type == "single" and self.k_endog > 1: - raise ValueError('Cannot use filter_type = "single" with multiple observed time series') + raise ValueError( + 'Cannot use filter_type = "single" with multiple observed time series' + ) self.kalman_filter = FILTER_FACTORY[filter_type.lower()]() self.kalman_smoother = KalmanSmoother() @@ -271,6 +283,24 @@ def __init__( console = Console() console.print(self.requirement_table) + def _prefix_name(self, var_name: str) -> str: + """ + Add the model name as a prefix to a variable name if the model has a name. + + Parameters + ---------- + var_name : str + The base variable name + + Returns + ------- + str + The prefixed variable name if self.name is set, otherwise the original name + """ + if self.name is None: + return var_name + return f"{self.name}_{var_name}" + def _populate_prior_requirements(self) -> None: """ Add requirements about priors needed for the model to a rich table, including their names, @@ -308,7 +338,9 @@ def _populate_data_requirements(self) -> None: self.requirement_table.add_section() for data, info in self.data_info.items(): - self.requirement_table.add_row(data, str(info["shape"]), "pm.Data", str(info["dims"])) + self.requirement_table.add_row( + data, str(info["shape"]), "pm.Data", str(info["dims"]) + ) def _initialize_requirement_table(self) -> None: self.requirement_table = Table( @@ -435,7 +467,9 @@ def observed_states(self) -> list[str]: """ A k_endog length list of strings, associated with the model's observed states """ - raise NotImplementedError("The observed_states property has not been implemented!") + raise NotImplementedError( + "The observed_states property has not been implemented!" + ) @property def shock_names(self) -> list[str]: @@ -453,7 +487,9 @@ def default_priors(self) -> dict[str, Callable]: Returns a dictionary with param_name: Callable key-value pairs. Used by the ``add_default_priors()`` method to automatically add priors to the PyMC model. """ - raise NotImplementedError("The default_priors property has not been implemented!") + raise NotImplementedError( + "The default_priors property has not been implemented!" + ) @property def coords(self) -> dict[str, Sequence[str]]: @@ -482,7 +518,56 @@ def add_default_priors(self) -> None: """ Add default priors to the active PyMC model context """ - raise NotImplementedError("The add_default_priors property has not been implemented!") + raise NotImplementedError( + "The add_default_priors property has not been implemented!" + ) + + def register_variable(self, name: str, variable: pt.TensorVariable) -> None: + """ + Register an existing pytensor variable in the _name_to_variable dictionary + + Parameters + ---------- + name : str + The name of the variable. Must be the name of a model parameter. + variable : pytensor.TensorVariable + The pytensor variable to register. This should be an existing variable, not a symbolic placeholder. + + Notes + ----- + This method is used to register pre-existing pytensor variables (e.g., those passed to the model constructor) + in the internal ``_name_to_variable`` dictionary. This is different from ``make_and_register_variable``, which + creates new symbolic placeholders. + + The registered variable will be used in the ``_insert_random_variables`` method via ``pytensor.graph_replace`` + to substitute the variable into the computational graph. + + An error is raised if the provided name has already been registered, or if the name is not present in the + ``param_names`` property. + + Examples + -------- + >>> class MyModel(PyMCStateSpace): + ... def __init__(self, A, B, name=None): + ... super().__init__(k_states=1, k_posdef=1, name=name) + ... self.A = A + ... self.B = B + ... self.register_variable("A", self.A) + ... self.register_variable("B", self.B) + """ + if name not in self.param_names: + raise ValueError( + f"{name} is not a model parameter. Valid parameter names are: {self.param_names}" + ) + + if name in self._name_to_variable.keys(): + raise ValueError( + f"{name} is already a registered variable with shape " + f"{self._name_to_variable[name].type.shape}" + ) + + # Store with the unprefixed name as the key - prefixing happens when looking up in PyMC model + self._name_to_variable[name] = variable def make_and_register_variable( self, name, shape: int | tuple[int, ...] | None = None, dtype=floatX @@ -513,6 +598,8 @@ def make_and_register_variable( An error is raised if the provided name has already been registered, or if the name is not present in the ``param_names`` property. + + If the model has a name, the variable will be registered with a prefixed name in the internal dictionary. """ if name not in self.param_names: raise ValueError( @@ -527,6 +614,7 @@ def make_and_register_variable( ) placeholder = pt.tensor(name, shape=shape, dtype=dtype) + # Store with the unprefixed name as the key - prefixing happens when looking up in PyMC model self._name_to_variable[name] = placeholder return placeholder @@ -552,6 +640,8 @@ def make_and_register_data( An error is raised if the provided name has already been registered, or if the name is not present in the ``data_names`` property. + + If the model has a name, the data variable will be registered with a prefixed name in the internal dictionary. """ if name not in self.data_names: raise ValueError( @@ -566,6 +656,7 @@ def make_and_register_data( ) placeholder = pt.tensor(name, shape=shape, dtype=dtype) + # Store with the unprefixed name as the key - prefixing happens when looking up in PyMC model self._name_to_data[name] = placeholder return placeholder @@ -624,9 +715,13 @@ def make_symbolic_graph(self) -> None: self.ssm['selection', 1:, 0] = theta_params self.ssm['state_cov', 0, 0] = sigma """ - raise NotImplementedError("The make_symbolic_statespace method has not been implemented!") + raise NotImplementedError( + "The make_symbolic_statespace method has not been implemented!" + ) - def _get_matrix_shape_and_dims(self, name: str) -> tuple[tuple[int] | None, tuple[str] | None]: + def _get_matrix_shape_and_dims( + self, name: str + ) -> tuple[tuple[int] | None, tuple[str] | None]: """ Get the shape and dimensions of a matrix associated with the specified name. @@ -712,16 +807,26 @@ def _insert_random_variables(self): found_params = [] with pymc_model: for param_name in self.param_names: - param = getattr(pymc_model, param_name, None) + # Look for the parameter with the prefixed name in the PyMC model + prefixed_name = self._prefix_name(param_name) + param = getattr(pymc_model, prefixed_name, None) if param is not None: - found_params.append(param.name) + found_params.append(param_name) missing_params = list(set(self.param_names) - set(found_params)) if len(missing_params) > 0: - raise ValueError( - "The following required model parameters were not found in the PyMC model: " - + ", ".join(missing_params) - ) + # Include information about the prefix in error message + if self.name is not None: + missing_prefixed = [self._prefix_name(p) for p in missing_params] + raise ValueError( + f"The following required model parameters were not found in the PyMC model " + f"(model name: '{self.name}'): " + ", ".join(missing_prefixed) + ) + else: + raise ValueError( + "The following required model parameters were not found in the PyMC model: " + + ", ".join(missing_params) + ) excess_params = list(set(found_params) - set(self.param_names)) if len(excess_params) > 0: @@ -732,7 +837,11 @@ def _insert_random_variables(self): matrices = list(self._unpack_statespace_with_placeholders()) - replacement_dict = {var: pymc_model[name] for name, var in self._name_to_variable.items()} + # Build replacement dict using prefixed names to look up in PyMC model + replacement_dict = { + var: pymc_model[self._prefix_name(name)] + for name, var in self._name_to_variable.items() + } self.subbed_ssm = graph_replace(matrices, replace=replacement_dict, strict=True) def _insert_data_variables(self): @@ -751,19 +860,35 @@ def _insert_data_variables(self): found_data = [] with pymc_model: for data_name in data_names: - data = getattr(pymc_model, data_name, None) + # Look for the data variable with the prefixed name in the PyMC model + prefixed_name = self._prefix_name(data_name) + data = getattr(pymc_model, prefixed_name, None) if data is not None: - found_data.append(data.name) + found_data.append(data_name) missing_data = list(set(data_names) - set(found_data)) if len(missing_data) > 0: - raise ValueError( - "The following required exogenous data were not found in the PyMC model: " - + ", ".join(missing_data) - ) + # Include information about the prefix in error message + if self.name is not None: + missing_prefixed = [self._prefix_name(d) for d in missing_data] + raise ValueError( + f"The following required exogenous data were not found in the PyMC model " + f"(model name: '{self.name}'): " + ", ".join(missing_prefixed) + ) + else: + raise ValueError( + "The following required exogenous data were not found in the PyMC model: " + + ", ".join(missing_data) + ) - replacement_dict = {data: pymc_model[name] for name, data in self._name_to_data.items()} - self.subbed_ssm = graph_replace(self.subbed_ssm, replace=replacement_dict, strict=True) + # Build replacement dict using prefixed names to look up in PyMC model + replacement_dict = { + data: pymc_model[self._prefix_name(name)] + for name, data in self._name_to_data.items() + } + self.subbed_ssm = graph_replace( + self.subbed_ssm, replace=replacement_dict, strict=True + ) def _register_matrices_with_pymc_model(self) -> list[pt.TensorVariable]: """ @@ -782,22 +907,25 @@ def _register_matrices_with_pymc_model(self) -> list[pt.TensorVariable]: registered_matrices = [] for i, (matrix, name) in enumerate(zip(matrices, MATRIX_NAMES)): time_varying_ndim = 2 if name in VECTOR_VALUED else 3 - if not getattr(pm_mod, name, None): + # Use the prefixed name when checking/registering in PyMC model + prefixed_name = self._prefix_name(name) + if not getattr(pm_mod, prefixed_name, None): shape, dims = self._get_matrix_shape_and_dims(name) has_dims = dims is not None if matrix.ndim == time_varying_ndim and has_dims: dims = (TIME_DIM, *dims) - x = pm.Deterministic(name, matrix, dims=dims) + x = pm.Deterministic(prefixed_name, matrix, dims=dims) registered_matrices.append(x) else: registered_matrices.append(matrices[i]) return registered_matrices - @staticmethod - def _register_kalman_filter_outputs_with_pymc_model(outputs: tuple[pt.TensorVariable]) -> None: + def _register_kalman_filter_outputs_with_pymc_model( + self, outputs: tuple[pt.TensorVariable] + ) -> None: mod = modelcontext(None) coords = mod.coords @@ -819,8 +947,12 @@ def _register_kalman_filter_outputs_with_pymc_model(outputs: tuple[pt.TensorVari with mod: for var, name in zip(states + covs, state_names + cov_names): dim_names = FILTER_OUTPUT_DIMS.get(name, None) - dims = tuple([dim if dim in coords.keys() else None for dim in dim_names]) - pm.Deterministic(name, var, dims=dims) + dims = tuple( + [dim if dim in coords.keys() else None for dim in dim_names] + ) + # Use the prefixed name when registering in PyMC model + prefixed_name = self._prefix_name(name) + pm.Deterministic(prefixed_name, var, dims=dims) def build_statespace_graph( self, @@ -918,6 +1050,7 @@ def build_statespace_graph( obs_coords=obs_coords, register_data=register_data, missing_fill_value=missing_fill_value, + data_name=self._prefix_name("data"), ) filter_outputs = self.kalman_filter.build_graph( @@ -939,10 +1072,12 @@ def build_statespace_graph( self._register_kalman_filter_outputs_with_pymc_model(all_kf_outputs) obs_dims = FILTER_OUTPUT_DIMS["predicted_observed_states"] - obs_dims = obs_dims if all([dim in pm_mod.coords.keys() for dim in obs_dims]) else None + obs_dims = ( + obs_dims if all([dim in pm_mod.coords.keys() for dim in obs_dims]) else None + ) SequenceMvNormal( - "obs", + self._prefix_name("obs"), mus=observed_states, covs=observed_covariances, logp=logp, @@ -1048,7 +1183,9 @@ def _kalman_filter_outputs_from_dummy_graph( data: pt.TensorLike | None = None, data_dims: str | tuple[str] | list[str] | None = None, scenario: dict[str, pd.DataFrame] | pd.DataFrame | None = None, - ) -> tuple[list[pt.TensorVariable], list[tuple[pt.TensorVariable, pt.TensorVariable]]]: + ) -> tuple[ + list[pt.TensorVariable], list[tuple[pt.TensorVariable, pt.TensorVariable]] + ]: """ Builds a Kalman filter graph using "dummy" pm.Flat distributions for the model variables and sorts the returns into (mean, covariance) pairs for each of filtered, predicted, and smoothed output. @@ -1102,6 +1239,7 @@ def _kalman_filter_outputs_from_dummy_graph( obs_coords=obs_coords, data_dims=data_dims, register_data=True, + data_name=self._prefix_name("data"), ) filter_outputs = self.kalman_filter.build_graph( @@ -1210,17 +1348,21 @@ def _sample_conditional( state_dims = ( (TIME_DIM, ALL_STATE_DIM) - if all([dim in self._fit_coords for dim in [TIME_DIM, ALL_STATE_DIM]]) + if all( + [dim in self._fit_coords for dim in [TIME_DIM, ALL_STATE_DIM]] + ) else (None, None) ) obs_dims = ( (TIME_DIM, OBS_STATE_DIM) - if all([dim in self._fit_coords for dim in [TIME_DIM, OBS_STATE_DIM]]) + if all( + [dim in self._fit_coords for dim in [TIME_DIM, OBS_STATE_DIM]] + ) else (None, None) ) SequenceMvNormal( - f"{name}_{group}", + self._prefix_name(f"{name}_{group}"), mus=mu, covs=cov, logp=dummy_ll, @@ -1232,7 +1374,7 @@ def _sample_conditional( obs_cov = Z @ cov @ pt.swapaxes(Z, -2, -1) + H SequenceMvNormal( - f"{name}_{group}_observed", + self._prefix_name(f"{name}_{group}_observed"), mus=obs_mu, covs=obs_cov, logp=dummy_ll, @@ -1247,13 +1389,14 @@ def _sample_conditional( frozen_model = freeze_dims_and_data(forward_model) with frozen_model: + var_names_to_sample = [ + self._prefix_name(f"{name}_{group}{suffix}") + for name in FILTER_OUTPUT_TYPES + for suffix in ["", "_observed"] + ] idata_conditional = pm.sample_posterior_predictive( group_idata, - var_names=[ - f"{name}_{group}{suffix}" - for name in FILTER_OUTPUT_TYPES - for suffix in ["", "_observed"] - ], + var_names=var_names_to_sample, random_seed=random_seed, compile_kwargs=compile_kwargs, **kwargs, @@ -1342,10 +1485,17 @@ def _sample_unconditional( else: steps = len(temp_coords[TIME_DIM]) - 1 - if all([dim in self._fit_coords for dim in [TIME_DIM, ALL_STATE_DIM, OBS_STATE_DIM]]): + if all( + [ + dim in self._fit_coords + for dim in [TIME_DIM, ALL_STATE_DIM, OBS_STATE_DIM] + ] + ): dims = [TIME_DIM, ALL_STATE_DIM, OBS_STATE_DIM] - with pm.Model(coords=temp_coords if dims is not None else None) as forward_model: + with pm.Model( + coords=temp_coords if dims is not None else None + ) as forward_model: self._build_dummy_graph() self._insert_random_variables() @@ -1358,12 +1508,13 @@ def _sample_unconditional( if not self.measurement_error: H_jittered = pm.Deterministic( - "H_jittered", pt.specify_shape(stabilize(H), (self.k_endog, self.k_endog)) + self._prefix_name("H_jittered"), + pt.specify_shape(stabilize(H), (self.k_endog, self.k_endog)), ) matrices = [x0, P0, c, d, T, Z, R, H_jittered, Q] LinearGaussianStateSpace( - group, + self._prefix_name(group), *matrices, steps=steps, dims=dims, @@ -1379,9 +1530,13 @@ def _sample_unconditional( frozen_model = freeze_dims_and_data(forward_model) with frozen_model: + var_names_to_sample = [ + self._prefix_name(f"{group}_latent"), + self._prefix_name(f"{group}_observed"), + ] idata_unconditional = pm.sample_posterior_predictive( group_idata, - var_names=[f"{group}_latent", f"{group}_observed"], + var_names=var_names_to_sample, random_seed=random_seed, compile_kwargs=compile_kwargs, **kwargs, @@ -1430,7 +1585,11 @@ def sample_conditional_prior( """ return self._sample_conditional( - idata=idata, group="prior", random_seed=random_seed, mvn_method=mvn_method, **kwargs + idata=idata, + group="prior", + random_seed=random_seed, + mvn_method=mvn_method, + **kwargs, ) def sample_conditional_posterior( @@ -1473,7 +1632,11 @@ def sample_conditional_posterior( """ return self._sample_conditional( - idata=idata, group="posterior", random_seed=random_seed, mvn_method=mvn_method, **kwargs + idata=idata, + group="posterior", + random_seed=random_seed, + mvn_method=mvn_method, + **kwargs, ) def sample_unconditional_prior( @@ -1611,7 +1774,11 @@ def sample_unconditional_posterior( ) def sample_statespace_matrices( - self, idata, matrix_names: str | list[str] | None, group: str = "posterior", **kwargs + self, + idata, + matrix_names: str | list[str] | None, + group: str = "posterior", + **kwargs, ): """ Draw samples of requested statespace matrices from provided idata @@ -1658,7 +1825,10 @@ def sample_statespace_matrices( long_name = SHORT_NAME_TO_LONG[short_name] if (long_name in matrix_names) or (short_name in matrix_names): name = long_name if long_name in matrix_names else short_name - dims = [x if x in self._fit_coords else None for x in MATRIX_DIMS[short_name]] + dims = [ + x if x in self._fit_coords else None + for x in MATRIX_DIMS[short_name] + ] pm.Deterministic(name, matrix, dims=dims) # TODO: Remove this after pm.Flat has its initial_value fixed @@ -1678,7 +1848,11 @@ def sample_statespace_matrices( return matrix_idata def sample_filter_outputs( - self, idata, filter_output_names: str | list[str] | None, group: str = "posterior", **kwargs + self, + idata, + filter_output_names: str | list[str] | None, + group: str = "posterior", + **kwargs, ): if isinstance(filter_output_names, str): filter_output_names = [filter_output_names] @@ -1690,8 +1864,12 @@ def sample_filter_outputs( filter_output_names, list(FILTER_OUTPUT_DIMS.keys()) ) if unknown_filter_output_names.size > 0: - raise ValueError(f"{unknown_filter_output_names} not a valid filter output name!") - filter_output_names = [x for x in FILTER_OUTPUT_DIMS.keys() if x in filter_output_names] + raise ValueError( + f"{unknown_filter_output_names} not a valid filter output name!" + ) + filter_output_names = [ + x for x in FILTER_OUTPUT_DIMS.keys() if x in filter_output_names + ] compile_kwargs = kwargs.pop("compile_kwargs", {}) compile_kwargs.setdefault("mode", self.mode) @@ -1716,6 +1894,7 @@ def sample_filter_outputs( n_obs=self.ssm.k_endog, obs_coords=obs_coords, register_data=True, + data_name=self._prefix_name("data"), ) filter_outputs = self.kalman_filter.build_graph( @@ -1760,7 +1939,9 @@ def _validate_forecast_args( verbose: bool = True, ): if isinstance(start, pd.Timestamp) and start not in time_index: - raise ValueError("Datetime start must be in the data index used to fit the model.") + raise ValueError( + "Datetime start must be in the data index used to fit the model." + ) elif isinstance(start, int): if abs(start) > len(time_index): raise ValueError( @@ -1773,11 +1954,17 @@ def _validate_forecast_args( if periods is not None and end is not None: raise ValueError("Must specify exactly one of either periods or end") if scenario is None and use_scenario_index: - raise ValueError("use_scenario_index=True requires a scenario to be provided.") + raise ValueError( + "use_scenario_index=True requires a scenario to be provided." + ) if scenario is not None and use_scenario_index: if isinstance(scenario, dict): first_df = next( - (df for df in scenario.values() if isinstance(df, pd.DataFrame | pd.Series)), + ( + df + for df in scenario.values() + if isinstance(df, pd.DataFrame | pd.Series) + ), None, ) if first_df is None: @@ -1788,14 +1975,22 @@ def _validate_forecast_args( raise ValueError( "use_scenario_index=True requires a scenario to be a DataFrame or Series." ) - if use_scenario_index and any(arg is not None for arg in [start, end, periods]) and verbose: + if ( + use_scenario_index + and any(arg is not None for arg in [start, end, periods]) + and verbose + ): _log.warning( "start, end, and periods arguments are ignored when use_scenario_index is True. Pass only " "one or the other to avoid this warning, or pass verbose = False." ) def _get_fit_time_index(self) -> pd.RangeIndex | pd.DatetimeIndex: - time_index = self._fit_coords.get(TIME_DIM, None) if self._fit_coords is not None else None + time_index = ( + self._fit_coords.get(TIME_DIM, None) + if self._fit_coords is not None + else None + ) if time_index is None: raise ValueError( "No time dimension found on coordinates used to fit the model. Has this model been fit?" @@ -1811,7 +2006,9 @@ def _get_fit_time_index(self) -> pd.RangeIndex | pd.DatetimeIndex: def _validate_scenario_data( self, - scenario: pd.DataFrame | np.ndarray | dict[str, pd.DataFrame | np.ndarray] | None, + scenario: ( + pd.DataFrame | np.ndarray | dict[str, pd.DataFrame | np.ndarray] | None + ), name: str | None = None, verbose=True, ): @@ -1868,9 +2065,13 @@ def _validate_scenario_data( # For checking shapes, the first object will always be good enough. But we also need to make sure all the # indices agree, so we grab the first dataframe (which might not exist, but that's OK) first_scenario = next(iter(scenario.values())) - first_df = next((df for df in scenario.values() if isinstance(df, pd.DataFrame)), None) + first_df = next( + (df for df in scenario.values() if isinstance(df, pd.DataFrame)), None + ) - if not all(data.shape[0] == first_scenario.shape[0] for data in scenario.values()): + if not all( + data.shape[0] == first_scenario.shape[0] for data in scenario.values() + ): raise ValueError( "Scenario data must have the same number of time steps for all variables." ) @@ -1880,7 +2081,9 @@ def _validate_scenario_data( for df in scenario.values() if isinstance(df, pd.DataFrame) ): - raise ValueError("Scenario data must have the same index for all variables.") + raise ValueError( + "Scenario data must have the same index for all variables." + ) return scenario @@ -1904,9 +2107,9 @@ def _validate_scenario_data( # Omit dataframe from this basic shape check so we can give more detailed information about missing columns # in the next check - if not isinstance(scenario, pd.DataFrame | pd.Series) and scenario.shape[1] != len( - coords[name] - ): + if not isinstance(scenario, pd.DataFrame | pd.Series) and scenario.shape[ + 1 + ] != len(coords[name]): raise ValueError( f"Scenario data for variable '{name}' has the wrong number of columns. Expected " f"{len(coords[name])}, got {scenario.shape[1]}" @@ -2044,7 +2247,9 @@ def get_or_create_index(x, time_index, start=None): # date_range includes both the start and end date, but we're going to pop off the start later # (it will be interpreted as x0). So we need to add 1 to the periods so the user gets "periods" # number of forecasts back - forecast_index = pd.date_range(start, periods=periods + 1, freq=freq) + forecast_index = pd.date_range( + start, periods=periods + 1, freq=freq + ) else: # If the user provided a positive integer as start, directly interpret it as the start time. If its @@ -2054,7 +2259,9 @@ def get_or_create_index(x, time_index, start=None): if end is not None: forecast_index = pd.RangeIndex(start, end, step=1, dtype="int") if periods is not None: - forecast_index = pd.RangeIndex(start, start + periods + 1, step=1, dtype="int") + forecast_index = pd.RangeIndex( + start, start + periods + 1, step=1, dtype="int" + ) if is_datetime: if forecast_index.freq != time_index.freq: @@ -2076,12 +2283,16 @@ def get_or_create_index(x, time_index, start=None): def _finalize_scenario_initialization( self, - scenario: pd.DataFrame | np.ndarray | dict[str, pd.DataFrame | np.ndarray] | None, + scenario: ( + pd.DataFrame | np.ndarray | dict[str, pd.DataFrame | np.ndarray] | None + ), forecast_index: pd.RangeIndex | pd.DatetimeIndex, name=None, ): try: - var_to_dims = {key: info["dims"][1:] for key, info in self.data_info.items()} + var_to_dims = { + key: info["dims"][1:] for key, info in self.data_info.items() + } except NotImplementedError: return scenario @@ -2098,7 +2309,9 @@ def _finalize_scenario_initialization( if isinstance(scenario, dict): for name, data in scenario.items(): - scenario[name] = self._finalize_scenario_initialization(data, forecast_index, name) + scenario[name] = self._finalize_scenario_initialization( + data, forecast_index, name + ) return scenario # This was already checked as valid @@ -2114,7 +2327,9 @@ def _finalize_scenario_initialization( # lists and tuples were handled during validation, along with shape check, so just cast arrays to dataframes # with the correct index and columns if isinstance(scenario, np.ndarray): - scenario = pd.DataFrame(scenario, index=forecast_index, columns=coords[name]) + scenario = pd.DataFrame( + scenario, index=forecast_index, columns=coords[name] + ) return scenario @@ -2125,7 +2340,12 @@ def _build_forecast_model( temp_coords = self._fit_coords.copy() dims = None - if all([dim in temp_coords for dim in [filter_time_dim, ALL_STATE_DIM, OBS_STATE_DIM]]): + if all( + [ + dim in temp_coords + for dim in [filter_time_dim, ALL_STATE_DIM, OBS_STATE_DIM] + ] + ): dims = [TIME_DIM, ALL_STATE_DIM, OBS_STATE_DIM] t0_idx = np.flatnonzero(time_index == t0)[0] @@ -2134,13 +2354,20 @@ def _build_forecast_model( temp_coords[TIME_DIM] = forecast_index mu_dims, cov_dims = None, None - if all([dim in self._fit_coords for dim in [TIME_DIM, ALL_STATE_DIM, ALL_STATE_AUX_DIM]]): + if all( + [ + dim in self._fit_coords + for dim in [TIME_DIM, ALL_STATE_DIM, ALL_STATE_AUX_DIM] + ] + ): mu_dims = ["data_time", ALL_STATE_DIM] cov_dims = ["data_time", ALL_STATE_DIM, ALL_STATE_AUX_DIM] with pm.Model(coords=temp_coords) as forecast_model: - (_, _, *matrices), grouped_outputs = self._kalman_filter_outputs_from_dummy_graph( - data_dims=["data_time", OBS_STATE_DIM], + (_, _, *matrices), grouped_outputs = ( + self._kalman_filter_outputs_from_dummy_graph( + data_dims=["data_time", OBS_STATE_DIM], + ) ) group_idx = FILTER_OUTPUT_TYPES.index(filter_output) @@ -2152,22 +2379,31 @@ def _build_forecast_model( } missing_data_vars = np.setdiff1d( - ar1=[*self.data_names, "data"], ar2=[k.name for k, _ in sub_dict.items()] + ar1=[*self.data_names, "data"], + ar2=[k.name for k, _ in sub_dict.items()], ) if missing_data_vars.size > 0: - raise ValueError(f"{missing_data_vars} data used for fitting not found!") + raise ValueError( + f"{missing_data_vars} data used for fitting not found!" + ) - mu_frozen, cov_frozen = graph_replace([mu, cov], replace=sub_dict, strict=True) + mu_frozen, cov_frozen = graph_replace( + [mu, cov], replace=sub_dict, strict=True + ) x0 = pm.Deterministic( - "x0_slice", mu_frozen[t0_idx], dims=mu_dims[1:] if mu_dims is not None else None + self._prefix_name("x0_slice"), + mu_frozen[t0_idx], + dims=mu_dims[1:] if mu_dims is not None else None, ) P0 = pm.Deterministic( - "P0_slice", cov_frozen[t0_idx], dims=cov_dims[1:] if cov_dims is not None else None + self._prefix_name("P0_slice"), + cov_frozen[t0_idx], + dims=cov_dims[1:] if cov_dims is not None else None, ) _ = LinearGaussianStateSpace( - "forecast", + self._prefix_name("forecast"), x0, P0, *matrices, @@ -2187,7 +2423,9 @@ def forecast( start: int | pd.Timestamp | None = None, periods: int | None = None, end: int | pd.Timestamp = None, - scenario: pd.DataFrame | np.ndarray | dict[str, pd.DataFrame | np.ndarray] | None = None, + scenario: ( + pd.DataFrame | np.ndarray | dict[str, pd.DataFrame | np.ndarray] | None + ) = None, use_scenario_index: bool = False, filter_output="smoothed", random_seed: RandomState | None = None, @@ -2341,9 +2579,13 @@ def forecast( frozen_model = freeze_dims_and_data(forecast_model) with frozen_model: + var_names_to_sample = [ + self._prefix_name("forecast_latent"), + self._prefix_name("forecast_observed"), + ] idata_forecast = pm.sample_posterior_predictive( idata, - var_names=["forecast_latent", "forecast_observed"], + var_names=var_names_to_sample, random_seed=random_seed, compile_kwargs=compile_kwargs, **kwargs, @@ -2437,7 +2679,9 @@ def impulse_response_function( compile_kwargs.setdefault("mode", self.mode) if n_options > 1: - raise ValueError("Specify exactly 0 or 1 of shock_size, shock_cov, or shock_trajectory") + raise ValueError( + "Specify exactly 0 or 1 of shock_size, shock_cov, or shock_trajectory" + ) elif n_options == 1: # If the user passed an alternative parameterization for the shocks of the IRF, don't use the posterior use_posterior_cov = False @@ -2468,7 +2712,11 @@ def impulse_response_function( self._insert_random_variables() P0, _, c, d, T, Z, R, H, post_Q = self.unpack_statespace() - x0 = pm.Deterministic("x0_new", pt.zeros(self.k_states), dims=[ALL_STATE_DIM]) + x0 = pm.Deterministic( + self._prefix_name("x0_new"), + pt.zeros(self.k_states), + dims=[ALL_STATE_DIM], + ) if use_posterior_cov: Q = post_Q @@ -2483,11 +2731,15 @@ def impulse_response_function( shock_trajectory = pt.zeros((n_steps, self.k_posdef)) if Q is not None: init_shock = pm.MvNormal( - "initial_shock", mu=0, cov=Q, dims=[SHOCK_DIM], method=mvn_method + self._prefix_name("initial_shock"), + mu=0, + cov=Q, + dims=[SHOCK_DIM], + method=mvn_method, ) else: init_shock = pm.Deterministic( - "initial_shock", + self._prefix_name("initial_shock"), pt.as_tensor_variable(np.atleast_1d(shock_size)), dims=[SHOCK_DIM], ) @@ -2509,11 +2761,13 @@ def irf_step(shock, x, c, T, R): strict=True, ) - pm.Deterministic("irf", irf, dims=[TIME_DIM, ALL_STATE_DIM]) + pm.Deterministic( + self._prefix_name("irf"), irf, dims=[TIME_DIM, ALL_STATE_DIM] + ) irf_idata = pm.sample_posterior_predictive( idata, - var_names=["irf"], + var_names=[self._prefix_name("irf")], random_seed=random_seed, compile_kwargs=compile_kwargs, **kwargs, diff --git a/pymc_extras/statespace/utils/data_tools.py b/pymc_extras/statespace/utils/data_tools.py index cbc5d517c..b145b2ebe 100644 --- a/pymc_extras/statespace/utils/data_tools.py +++ b/pymc_extras/statespace/utils/data_tools.py @@ -35,7 +35,9 @@ def get_data_dims(data): return data_dims -def _validate_data_shape(data_shape, n_obs, obs_coords=None, check_col_names=False, col_names=None): +def _validate_data_shape( + data_shape, n_obs, obs_coords=None, check_col_names=False, col_names=None +): if col_names is None: col_names = [] @@ -121,7 +123,28 @@ def preprocess_pandas_data(data, n_obs, obs_coords=None, check_column_names=Fals return preprocess_numpy_data(data.values, n_obs, obs_coords) -def add_data_to_active_model(values, index, data_dims=None): +def add_data_to_active_model(values, index, data_dims=None, name="data"): + """ + Add data to the active PyMC model. + + Parameters + ---------- + values : array-like + The data values to add + index : array-like + The time index for the data + data_dims : list of str, optional + The dimensions for the data. Defaults to [TIME_DIM, OBS_STATE_DIM] + name : str, optional + The name for the data variable in the PyMC model. Defaults to "data". + When using multiple state space models, this should be prefixed with the model name + to avoid naming conflicts. + + Returns + ------- + data : pm.Data + The PyMC Data variable + """ pymc_mod = modelcontext(None) if data_dims is None: data_dims = [TIME_DIM, OBS_STATE_DIM] @@ -146,7 +169,7 @@ def add_data_to_active_model(values, index, data_dims=None): else: data_shape = (None, values.shape[-1]) - data = pm.Data("data", values, dims=data_dims, shape=data_shape) + data = pm.Data(name, values, dims=data_dims, shape=data_shape) return data @@ -178,8 +201,42 @@ def mask_missing_values_in_data(values, missing_fill_value=None): def register_data_with_pymc( - data, n_obs, obs_coords, register_data=True, missing_fill_value=None, data_dims=None + data, + n_obs, + obs_coords, + register_data=True, + missing_fill_value=None, + data_dims=None, + data_name="data", ): + """ + Register data with the active PyMC model. + + Parameters + ---------- + data : pytensor.TensorVariable, numpy.ndarray, pandas.DataFrame, or pandas.Series + The data to register + n_obs : int + Number of observed variables + obs_coords : list + Coordinates for the observations + register_data : bool, optional + Whether to register the data with PyMC. Defaults to True. + missing_fill_value : float, optional + Value to use for missing data. Defaults to MISSING_FILL. + data_dims : list of str, optional + Dimensions for the data variable + data_name : str, optional + Name for the data variable in the PyMC model. Defaults to "data". + When using multiple state space models, this should be prefixed with the model name. + + Returns + ------- + data : pm.Data or pytensor.shared + The registered data variable + nan_mask : numpy.ndarray + Boolean mask indicating missing values + """ if isinstance(data, pt.TensorVariable | TensorSharedVariable): values, index = preprocess_tensor_data(data, n_obs, obs_coords) elif isinstance(data, np.ndarray): @@ -187,12 +244,14 @@ def register_data_with_pymc( elif isinstance(data, pd.DataFrame | pd.Series): values, index = preprocess_pandas_data(data, n_obs, obs_coords) else: - raise ValueError("Data should be one of pytensor tensor, numpy array, or pandas dataframe") + raise ValueError( + "Data should be one of pytensor tensor, numpy array, or pandas dataframe" + ) data, nan_mask = mask_missing_values_in_data(values, missing_fill_value) if register_data: - data = add_data_to_active_model(data, index, data_dims) + data = add_data_to_active_model(data, index, data_dims, name=data_name) else: - data = pytensor.shared(data, name="data") + data = pytensor.shared(data, name=data_name) return data, nan_mask