| 
 | 1 | +import numpy as np  | 
 | 2 | + | 
 | 3 | +from pytensor import tensor as pt  | 
 | 4 | + | 
 | 5 | +from pymc_extras.statespace.models.structural.core import Component  | 
 | 6 | +from pymc_extras.statespace.models.structural.utils import _frequency_transition_block  | 
 | 7 | + | 
 | 8 | + | 
 | 9 | +class CycleComponent(Component):  | 
 | 10 | +    r"""  | 
 | 11 | +    A component for modeling longer-term cyclical effects  | 
 | 12 | +
  | 
 | 13 | +    Parameters  | 
 | 14 | +    ----------  | 
 | 15 | +    name: str  | 
 | 16 | +        Name of the component. Used in generated coordinates and state names. If None, a descriptive name will be  | 
 | 17 | +        used.  | 
 | 18 | +
  | 
 | 19 | +    cycle_length: int, optional  | 
 | 20 | +        The length of the cycle, in the calendar units of your data. For example, if your data is monthly, and you  | 
 | 21 | +        want to model a 12-month cycle, use ``cycle_length=12``. You cannot specify both ``cycle_length`` and  | 
 | 22 | +        ``estimate_cycle_length``.  | 
 | 23 | +
  | 
 | 24 | +    estimate_cycle_length: bool, default False  | 
 | 25 | +        Whether to estimate the cycle length. If True, an additional parameter, ``cycle_length`` will be added to the  | 
 | 26 | +        model. You cannot specify both ``cycle_length`` and ``estimate_cycle_length``.  | 
 | 27 | +
  | 
 | 28 | +    dampen: bool, default False  | 
 | 29 | +        Whether to dampen the cycle by multiplying by a dampening factor :math:`\rho` at every timestep. If true,  | 
 | 30 | +        an additional parameter, ``dampening_factor`` will be added to the model.  | 
 | 31 | +
  | 
 | 32 | +    innovations: bool, default True  | 
 | 33 | +        Whether to include stochastic innovations in the strength of the seasonal effect. If True, an additional  | 
 | 34 | +        parameter, ``sigma_{name}`` will be added to the model.  | 
 | 35 | +
  | 
 | 36 | +    Notes  | 
 | 37 | +    -----  | 
 | 38 | +    The cycle component is very similar in implementation to the frequency domain seasonal component, expect that it  | 
 | 39 | +    is restricted to n=1. The cycle component can be expressed:  | 
 | 40 | +
  | 
 | 41 | +    .. math::  | 
 | 42 | +        \begin{align}  | 
 | 43 | +            \gamma_t &= \rho \gamma_{t-1} \cos \lambda + \rho \gamma_{t-1}^\star \sin \lambda + \omega_{t} \\  | 
 | 44 | +            \gamma_{t}^\star &= -\rho \gamma_{t-1} \sin \lambda + \rho \gamma_{t-1}^\star \cos \lambda + \omega_{t}^\star \\  | 
 | 45 | +            \lambda &= \frac{2\pi}{s}  | 
 | 46 | +        \end{align}  | 
 | 47 | +
  | 
 | 48 | +    Where :math:`s` is the ``cycle_length``. [1] recommend that this component be used for longer term cyclical  | 
 | 49 | +    effects, such as business cycles, and that the seasonal component be used for shorter term effects, such as  | 
 | 50 | +    weekly or monthly seasonality.  | 
 | 51 | +
  | 
 | 52 | +    Unlike a FrequencySeasonality component, the length of a CycleComponent can be estimated.  | 
 | 53 | +
  | 
 | 54 | +    Examples  | 
 | 55 | +    --------  | 
 | 56 | +    Estimate a business cycle with length between 6 and 12 years:  | 
 | 57 | +
  | 
 | 58 | +    .. code:: python  | 
 | 59 | +
  | 
 | 60 | +        from pymc_extras.statespace import structural as st  | 
 | 61 | +        import pymc as pm  | 
 | 62 | +        import pytensor.tensor as pt  | 
 | 63 | +        import pandas as pd  | 
 | 64 | +        import numpy as np  | 
 | 65 | +
  | 
 | 66 | +        data = np.random.normal(size=(100, 1))  | 
 | 67 | +
  | 
 | 68 | +        # Build the structural model  | 
 | 69 | +        grw = st.LevelTrendComponent(order=1, innovations_order=1)  | 
 | 70 | +        cycle = st.CycleComponent('business_cycle', estimate_cycle_length=True, dampen=False)  | 
 | 71 | +        ss_mod = (grw + cycle).build()  | 
 | 72 | +
  | 
 | 73 | +        # Estimate with PyMC  | 
 | 74 | +        with pm.Model(coords=ss_mod.coords) as model:  | 
 | 75 | +            P0 = pm.Deterministic('P0', pt.eye(ss_mod.k_states), dims=ss_mod.param_dims['P0'])  | 
 | 76 | +            intitial_trend = pm.Normal('initial_trend', dims=ss_mod.param_dims['initial_trend'])  | 
 | 77 | +            sigma_trend = pm.HalfNormal('sigma_trend', dims=ss_mod.param_dims['sigma_trend'])  | 
 | 78 | +
  | 
 | 79 | +            cycle_strength = pm.Normal('business_cycle')  | 
 | 80 | +            cycle_length = pm.Uniform('business_cycle_length', lower=6, upper=12)  | 
 | 81 | +
  | 
 | 82 | +            sigma_cycle = pm.HalfNormal('sigma_business_cycle', sigma=1)  | 
 | 83 | +            ss_mod.build_statespace_graph(data)  | 
 | 84 | +
  | 
 | 85 | +            idata = pm.sample(nuts_sampler='numpyro')  | 
 | 86 | +
  | 
 | 87 | +    References  | 
 | 88 | +    ----------  | 
 | 89 | +    .. [1] Durbin, James, and Siem Jan Koopman. 2012.  | 
 | 90 | +        Time Series Analysis by State Space Methods: Second Edition.  | 
 | 91 | +        Oxford University Press.  | 
 | 92 | +    """  | 
 | 93 | + | 
 | 94 | +    def __init__(  | 
 | 95 | +        self,  | 
 | 96 | +        name: str | None = None,  | 
 | 97 | +        cycle_length: int | None = None,  | 
 | 98 | +        estimate_cycle_length: bool = False,  | 
 | 99 | +        dampen: bool = False,  | 
 | 100 | +        innovations: bool = True,  | 
 | 101 | +        observed_state_names: list[str] | None = None,  | 
 | 102 | +    ):  | 
 | 103 | +        if observed_state_names is None:  | 
 | 104 | +            observed_state_names = ["data"]  | 
 | 105 | + | 
 | 106 | +        if cycle_length is None and not estimate_cycle_length:  | 
 | 107 | +            raise ValueError("Must specify cycle_length if estimate_cycle_length is False")  | 
 | 108 | +        if cycle_length is not None and estimate_cycle_length:  | 
 | 109 | +            raise ValueError("Cannot specify cycle_length if estimate_cycle_length is True")  | 
 | 110 | +        if name is None:  | 
 | 111 | +            cycle = int(cycle_length) if cycle_length is not None else "Estimate"  | 
 | 112 | +            name = f"Cycle[s={cycle}, dampen={dampen}, innovations={innovations}]"  | 
 | 113 | + | 
 | 114 | +        self.estimate_cycle_length = estimate_cycle_length  | 
 | 115 | +        self.cycle_length = cycle_length  | 
 | 116 | +        self.innovations = innovations  | 
 | 117 | +        self.dampen = dampen  | 
 | 118 | +        self.n_coefs = 1  | 
 | 119 | + | 
 | 120 | +        k_endog = len(observed_state_names)  | 
 | 121 | + | 
 | 122 | +        k_states = 2 * k_endog  | 
 | 123 | +        k_posdef = 2 * k_endog  | 
 | 124 | + | 
 | 125 | +        obs_state_idx = np.zeros(k_states)  | 
 | 126 | +        obs_state_idx[slice(0, k_states, 2)] = 1  | 
 | 127 | + | 
 | 128 | +        super().__init__(  | 
 | 129 | +            name=name,  | 
 | 130 | +            k_endog=k_endog,  | 
 | 131 | +            k_states=k_states,  | 
 | 132 | +            k_posdef=k_posdef,  | 
 | 133 | +            measurement_error=False,  | 
 | 134 | +            combine_hidden_states=True,  | 
 | 135 | +            obs_state_idxs=obs_state_idx,  | 
 | 136 | +            observed_state_names=observed_state_names,  | 
 | 137 | +        )  | 
 | 138 | + | 
 | 139 | +    def make_symbolic_graph(self) -> None:  | 
 | 140 | +        self.ssm["design", 0, slice(0, self.k_states, 2)] = 1  | 
 | 141 | +        self.ssm["selection", :, :] = np.eye(self.k_states)  | 
 | 142 | +        self.param_dims = {self.name: (f"{self.name}_state",)}  | 
 | 143 | +        self.coords = {f"{self.name}_state": self.state_names}  | 
 | 144 | + | 
 | 145 | +        init_state = self.make_and_register_variable(f"{self.name}", shape=(self.k_states,))  | 
 | 146 | + | 
 | 147 | +        self.ssm["initial_state", :] = init_state  | 
 | 148 | + | 
 | 149 | +        if self.estimate_cycle_length:  | 
 | 150 | +            lamb = self.make_and_register_variable(f"{self.name}_length", shape=())  | 
 | 151 | +        else:  | 
 | 152 | +            lamb = self.cycle_length  | 
 | 153 | + | 
 | 154 | +        if self.dampen:  | 
 | 155 | +            rho = self.make_and_register_variable(f"{self.name}_dampening_factor", shape=())  | 
 | 156 | +        else:  | 
 | 157 | +            rho = 1  | 
 | 158 | + | 
 | 159 | +        T = rho * _frequency_transition_block(lamb, j=1)  | 
 | 160 | +        self.ssm["transition", :, :] = T  | 
 | 161 | + | 
 | 162 | +        if self.innovations:  | 
 | 163 | +            sigma_cycle = self.make_and_register_variable(f"sigma_{self.name}", shape=())  | 
 | 164 | +            self.ssm["state_cov", :, :] = pt.eye(self.k_posdef) * sigma_cycle**2  | 
 | 165 | + | 
 | 166 | +    def populate_component_properties(self):  | 
 | 167 | +        self.state_names = [f"{self.name}_{f}" for f in ["Cos", "Sin"]]  | 
 | 168 | +        self.param_names = [f"{self.name}"]  | 
 | 169 | + | 
 | 170 | +        self.param_info = {  | 
 | 171 | +            f"{self.name}": {  | 
 | 172 | +                "shape": (2,),  | 
 | 173 | +                "constraints": None,  | 
 | 174 | +                "dims": (f"{self.name}_state",),  | 
 | 175 | +            }  | 
 | 176 | +        }  | 
 | 177 | + | 
 | 178 | +        if self.estimate_cycle_length:  | 
 | 179 | +            self.param_names += [f"{self.name}_length"]  | 
 | 180 | +            self.param_info[f"{self.name}_length"] = {  | 
 | 181 | +                "shape": (),  | 
 | 182 | +                "constraints": "Positive, non-zero",  | 
 | 183 | +                "dims": None,  | 
 | 184 | +            }  | 
 | 185 | + | 
 | 186 | +        if self.dampen:  | 
 | 187 | +            self.param_names += [f"{self.name}_dampening_factor"]  | 
 | 188 | +            self.param_info[f"{self.name}_dampening_factor"] = {  | 
 | 189 | +                "shape": (),  | 
 | 190 | +                "constraints": "0 < x ≤ 1",  | 
 | 191 | +                "dims": None,  | 
 | 192 | +            }  | 
 | 193 | + | 
 | 194 | +        if self.innovations:  | 
 | 195 | +            self.param_names += [f"sigma_{self.name}"]  | 
 | 196 | +            self.param_info[f"sigma_{self.name}"] = {  | 
 | 197 | +                "shape": (),  | 
 | 198 | +                "constraints": "Positive",  | 
 | 199 | +                "dims": None,  | 
 | 200 | +            }  | 
 | 201 | +            self.shock_names = self.state_names.copy()  | 
0 commit comments