diff --git a/pymc_extras/statespace/models/structural/components/seasonality.py b/pymc_extras/statespace/models/structural/components/seasonality.py index 1d103fbf0..341bd6ae5 100644 --- a/pymc_extras/statespace/models/structural/components/seasonality.py +++ b/pymc_extras/statespace/models/structural/components/seasonality.py @@ -14,24 +14,29 @@ class TimeSeasonality(Component): ---------- season_length: int The number of periods in a single seasonal cycle, e.g. 12 for monthly data with annual seasonal pattern, 7 for - daily data with weekly seasonal pattern, etc. + daily data with weekly seasonal pattern, etc. It must be greater than one. + + duration: int, default 1 + Number of time steps for each seasonal period. + This determines how long each seasonal period is held constant before moving to the next. innovations: bool, default True Whether to include stochastic innovations in the strength of the seasonal effect name: str, default None A name for this seasonal component. Used to label dimensions and coordinates. Useful when multiple seasonal - components are included in the same model. Default is ``f"Seasonal[s={season_length}]"`` + components are included in the same model. Default is ``f"Seasonal[s={season_length}, d={duration}]"`` state_names: list of str, default None - List of strings for seasonal effect labels. If provided, it must be of length ``season_length``. An example - would be ``state_names = ['Mon', 'Tue', 'Wed', 'Thur', 'Fri', 'Sat', 'Sun']`` when data is daily with a weekly + List of strings for seasonal effect labels. If provided, it must be of length ``season_length`` times ``duration``. + An example would be ``state_names = ['Mon', 'Tue', 'Wed', 'Thur', 'Fri', 'Sat', 'Sun']`` when data is daily with a weekly seasonal pattern (``season_length = 7``). - If None, states will be numbered ``[State_0, ..., State_s]`` + If None and ``duration = 1``, states will be named as ``[State_0, ..., State_s-1]`` (here s is ``season_length``). + If None and ``duration > 1``, states will be named as ``[State_0_0, ..., State_s-1_d-1]`` (here d is ``duration``). remove_first_state: bool, default True - If True, the first state will be removed from the model. This is done because there are only n-1 degrees of + If True, the first state will be removed from the model. This is done because there are only ``season_length-1`` degrees of freedom in the seasonal component, and one state is not identified. If False, the first state will be included in the model, but it will not be identified -- you will need to handle this in the priors (e.g. with ZeroSumNormal). @@ -41,17 +46,76 @@ class TimeSeasonality(Component): Notes ----- - A seasonal effect is any pattern that repeats every fixed interval. Although there are many possible ways to - model seasonal effects, the implementation used here is the one described by [1] as the "canonical" time domain - representation. The seasonal component can be expressed: + A seasonal effect is any pattern that repeats at fixed intervals. There are several ways to model such effects; + here, we present two models that are straightforward extensions of those described in [1]. + + **First model** (``remove_first_state=True``) + + In this model, the state vector is defined as: + + .. math:: + \alpha_t :=(\gamma_t, \ldots, \gamma_{t-d(s-1)+1}), \quad t \ge 0. + + This vector has length :math:`d(s-1)`, where: + + - :math:`s` is the ``seasonal_length`` parameter, and + - :math:`d` is the ``duration`` parameter. + + The components of the initial vector :math:`\alpha_{0}` are given by + + .. math:: + \gamma_{-l} := \tilde{\gamma}_{k_l}, \quad \text{where} \quad k_l := \left\lfloor \frac{l}{d} \right\rfloor \bmod s \quad \text{and} \quad l=0,\ldots, d(s-1)-1. + + Here, the values .. math:: - \gamma_t = -\sum_{i=1}^{s-1} \gamma_{t-i} + \omega_t, \quad \omega_t \sim N(0, \sigma_\gamma) + \tilde{\gamma}_{0}, \ldots, \tilde{\gamma}_{s-2}, - Where :math:`s` is the ``seasonal_length`` parameter and :math:`\omega_t` is the (optional) stochastic innovation. - To give interpretation to the :math:`\gamma` terms, it is helpful to work through the algebra for a simple - example. Let :math:`s=4`, and omit the shock term. Define initial conditions :math:`\gamma_0, \gamma_{-1}, - \gamma_{-2}`. The value of the seasonal component for the first 5 timesteps will be: + represent the initial seasonal states. The transition matrix of this model is the :math:`d(s-1) \times d(s-1)` matrix + + .. math:: + \begin{bmatrix} + -\mathbf{1}_d & -\mathbf{1}_d & \cdots & -\mathbf{1}_d & -\mathbf{1}_d \\ + \mathbf{1}_d & \mathbf{0}_d & \cdots & \mathbf{0}_d & \mathbf{0}_d \\ + \mathbf{0}_d & \mathbf{1}_d & \cdots & \mathbf{0}_d & \mathbf{0}_d \\ + \vdots & \vdots & \ddots & \vdots \\ + \mathbf{0}_d & \mathbf{0}_d & \cdots & \mathbf{1}_d & \mathbf{0}_d + \end{bmatrix} + + where :math:`\mathbf{1}_d` and :math:`\mathbf{0}_d` denote the :math:`d \times d` identity and null matrices, respectively. + + **Second model** (``remove_first_state=False``) + + In contrast, the state vector in the second model is defined as: + + .. math:: + \alpha_t=(\gamma_t, \ldots, \gamma_{t-ds+1}), \quad t \ge 0. + + This vector has length :math:`ds`. The components of the initial state vector :math:`\alpha_{0}` are defined similarly: + + .. math:: + \gamma_{-l} := \tilde{\gamma}_{k_l}, \quad \text{where} \quad k_l := \left\lfloor \frac{l}{d} \right\rfloor \bmod s \quad \text{and} \quad l=0,\ldots, ds-1. + + In this case, the initial seasonal states :math:`\tilde{\gamma}_{0}, \ldots, \tilde{\gamma}_{s-1}` are required to satisfy the following condition: + + .. math:: + \sum_{i=0}^{s-1} \tilde{\gamma}_{i} = 0. + + The transition matrix of this model is the following :math:`ds \times ds` circulant matrix: + + .. math:: + \begin{bmatrix} + 0 & 1 & 0 & \cdots & 0 \\ + 0 & 0 & 1 & \cdots & 0 \\ + \vdots & \vdots & \ddots & \ddots & \vdots \\ + 0 & 0 & \cdots & 0 & 1 \\ + 1 & 0 & \cdots & 0 & 0 + \end{bmatrix} + + To give interpretation to the :math:`\gamma` terms, it is helpful to work through the algebra for a simple + example. Let :math:`s=4`, :math:`d=1`, ``remove_first_state=True``, and omit the shock term. Then, we have + :math:`\gamma_{-i} = \tilde{\gamma}_{-i}`, for :math:`i=-2,\ldots, 0` and the value of the seasonal component + for the first 5 timesteps will be: .. math:: \begin{align} @@ -85,10 +149,38 @@ class TimeSeasonality(Component): And so on. So for interpretation, the ``season_length - 1`` initial states are, when reversed, the coefficients associated with ``state_names[1:]``. + In the next example, we set :math:`s=2`, :math:`d=2`, ``remove_first_state=True``, and omit the shock term. + By definition, the initial vector :math:`\alpha_{0}` is + + .. math:: + \alpha_0=(\tilde{\gamma}_{0}, \tilde{\gamma}_{0}, \tilde{\gamma}_{-1}, \tilde{\gamma}_{-1}) + + and the transition matrix is + + .. math:: + \begin{bmatrix} + -1 & 0 & -1 & 0 \\ + 0 & -1 & 0 & -1 \\ + 1 & 0 & 0 & 0 \\ + 0 & 1 & 0 & 0 \\ + \end{bmatrix} + + It is easy to verify that: + + .. math:: + \begin{align} + \gamma_1 &= -\tilde{\gamma}_0 - \tilde{\gamma}_{-1}\\ + \gamma_2 &= -(-\tilde{\gamma}_0 - \tilde{\gamma}_{-1})-\tilde{\gamma}_0\\ + &= \tilde{\gamma}_{-1}\\ + \gamma_3 &= -\tilde{\gamma}_{-1} +(\tilde{\gamma}_0 + \tilde{\gamma}_{-1})\\ + &= \tilde{\gamma}_{0}\\ + \gamma_4 &= -\tilde{\gamma}_0 - \tilde{\gamma}_{-1}.\\ + \end{align} + .. warning:: - Although the ``state_names`` argument expects a list of length ``season_length``, only ``state_names[1:]`` - will be saved as model dimensions, since the 1st coefficient is not identified (it is defined as - :math:`-\sum_{i=1}^{s} \gamma_{t-i}`). + Although the ``state_names`` argument expects a list of length ``season_length`` times ``duration``, + only ``state_names[duration:]`` will be saved as model dimensions, since the first coefficient is not identified + (it is defined as :math:`-\sum_{i=1}^{s-1} \tilde{\gamma}_{-i}`). Examples -------- @@ -137,6 +229,7 @@ class TimeSeasonality(Component): def __init__( self, season_length: int, + duration: int = 1, innovations: bool = True, name: str | None = None, state_names: list | None = None, @@ -146,29 +239,42 @@ def __init__( if observed_state_names is None: observed_state_names = ["data"] + if season_length <= 1 or not isinstance(season_length, int): + raise ValueError( + f"season_length must be an integer greater than 1, got {season_length}" + ) + if duration <= 0 or not isinstance(duration, int): + raise ValueError(f"duration must be a positive integer, got {duration}") if name is None: - name = f"Seasonal[s={season_length}]" + name = f"Seasonal[s={season_length}, d={duration}]" if state_names is None: - state_names = [f"{name}_{i}" for i in range(season_length)] + if duration > 1: + state_names = [ + f"{name}_{i}_{j}" for i in range(season_length) for j in range(duration) + ] + else: + state_names = [f"{name}_{i}" for i in range(season_length)] else: - if len(state_names) != season_length: + if len(state_names) != season_length * duration: raise ValueError( - f"state_names must be a list of length season_length, got {len(state_names)}" + f"state_names must be a list of length season_length*duration, got {len(state_names)}" ) state_names = state_names.copy() self.innovations = innovations + self.duration = duration self.remove_first_state = remove_first_state + self.season_length = season_length if self.remove_first_state: # In traditional models, the first state isn't identified, so we can help out the user by automatically # discarding it. # TODO: Can this be stashed and reconstructed automatically somehow? - state_names.pop(0) + state_names = state_names[duration:] self.provided_state_names = state_names - k_states = season_length - int(self.remove_first_state) + k_states = (season_length - int(self.remove_first_state)) * duration k_endog = len(observed_state_names) k_posdef = int(innovations) @@ -230,19 +336,40 @@ def populate_component_properties(self): def make_symbolic_graph(self) -> None: k_states = self.k_states // self.k_endog + duration = self.duration + k_unique_states = k_states // duration k_posdef = self.k_posdef // self.k_endog k_endog = self.k_endog if self.remove_first_state: # In this case, parameters are normalized to sum to zero, so the current state is the negative sum of # all previous states. - T = np.eye(k_states, k=-1) - T[0, :] = -1 + zero_d = pt.zeros((self.duration, self.duration)) + id_d = pt.eye(self.duration) + + row_blocks = [] + + # First row: all -1_d blocks + first_row = [-id_d for _ in range(self.season_length - 1)] + row_blocks.append(pt.concatenate(first_row, axis=1)) + + # Rows 2 to season_length-1: shifted identity blocks + for i in range(self.season_length - 2): + row = [] + for j in range(self.season_length - 1): + if j == i: + row.append(id_d) + else: + row.append(zero_d) + row_blocks.append(pt.concatenate(row, axis=1)) + + # Stack blocks + T = pt.concatenate(row_blocks, axis=0) else: # In this case we assume the user to be responsible for ensuring the states sum to zero, so T is just a # circulant matrix that cycles between the states. - T = np.eye(k_states, k=1) - T[-1, 0] = 1 + T = pt.eye(k_states, k=1) + T = pt.set_subtensor(T[-1, 0], 1) self.ssm["transition", :, :] = pt.linalg.block_diag(*[T for _ in range(k_endog)]) @@ -250,9 +377,15 @@ def make_symbolic_graph(self) -> None: self.ssm["design", :, :] = pt.linalg.block_diag(*[Z for _ in range(k_endog)]) initial_states = self.make_and_register_variable( - f"coefs_{self.name}", shape=(k_states,) if k_endog == 1 else (k_endog, k_states) + f"coefs_{self.name}", + shape=(k_unique_states,) if k_endog == 1 else (k_endog, k_unique_states), ) - self.ssm["initial_state", :] = initial_states.ravel() + if k_endog == 1: + self.ssm["initial_state", :] = pt.extra_ops.repeat(initial_states, duration, axis=0) + else: + self.ssm["initial_state", :] = pt.extra_ops.repeat( + initial_states, duration, axis=1 + ).ravel() if self.innovations: R = pt.zeros((k_states, k_posdef))[0, 0].set(1.0) diff --git a/tests/statespace/models/structural/components/test_seasonality.py b/tests/statespace/models/structural/components/test_seasonality.py index 93dcb0a5a..162b25868 100644 --- a/tests/statespace/models/structural/components/test_seasonality.py +++ b/tests/statespace/models/structural/components/test_seasonality.py @@ -14,6 +14,7 @@ @pytest.mark.parametrize("s", [10, 25, 50]) +@pytest.mark.parametrize("d", [1, 3]) @pytest.mark.parametrize("innovations", [True, False]) @pytest.mark.parametrize("remove_first_state", [True, False]) @pytest.mark.filterwarnings( @@ -21,55 +22,61 @@ "ignore:overflow encountered in matmul:RuntimeWarning", "ignore:invalid value encountered in matmul:RuntimeWarning", ) -def test_time_seasonality(s, innovations, remove_first_state, rng): +def test_time_seasonality(s, d, innovations, remove_first_state, rng): def random_word(rng): return "".join(rng.choice(list("abcdefghijklmnopqrstuvwxyz")) for _ in range(5)) - state_names = [random_word(rng) for _ in range(s)] + state_names = [random_word(rng) for _ in range(s * d)] mod = st.TimeSeasonality( season_length=s, + duration=d, innovations=innovations, name="season", state_names=state_names, remove_first_state=remove_first_state, ) - x0 = np.zeros(mod.k_states, dtype=config.floatX) + x0 = np.zeros(mod.k_states // mod.duration, dtype=config.floatX) x0[0] = 1 params = {"coefs_season": x0} if innovations: params["sigma_season"] = 0.0 - x, y = simulate_from_numpy_model(mod, rng, params) + x, y = simulate_from_numpy_model(mod, rng, params, steps=100 * mod.duration) y = y.ravel() if not innovations: - assert_pattern_repeats(y, s, atol=ATOL, rtol=RTOL) + assert_pattern_repeats(y, s * d, atol=ATOL, rtol=RTOL) # Check coords mod = mod.build(verbose=False) _assert_basic_coords_correct(mod) - test_slice = slice(1, None) if remove_first_state else slice(None) + test_slice = slice(d, None) if remove_first_state else slice(None) assert mod.coords["state_season"] == state_names[test_slice] +@pytest.mark.parametrize("d", [1, 3]) @pytest.mark.parametrize( "remove_first_state", [True, False], ids=["remove_first_state", "keep_first_state"] ) -def test_time_seasonality_multiple_observed(rng, remove_first_state): +def test_time_seasonality_multiple_observed(rng, d, remove_first_state): s = 3 - state_names = [f"state_{i}" for i in range(s)] + state_names = [f"state_{i}_{j}" for i in range(s) for j in range(d)] mod = st.TimeSeasonality( season_length=s, + duration=d, innovations=True, name="season", state_names=state_names, observed_state_names=["data_1", "data_2"], remove_first_state=remove_first_state, ) - x0 = np.zeros((mod.k_endog, mod.k_states // mod.k_endog), dtype=config.floatX) + x0 = np.zeros((mod.k_endog, mod.k_states // mod.k_endog // mod.duration), dtype=config.floatX) expected_states = [ - f"state_{i}[data_{j}]" for j in range(1, 3) for i in range(int(remove_first_state), s) + f"state_{i}_{j}[data_{k}]" + for k in range(1, 3) + for i in range(int(remove_first_state), s) + for j in range(d) ] assert mod.state_names == expected_states assert mod.shock_names == ["season[data_1]", "season[data_2]"] @@ -79,9 +86,9 @@ def test_time_seasonality_multiple_observed(rng, remove_first_state): params = {"coefs_season": x0, "sigma_season": np.array([0.0, 0.0], dtype=config.floatX)} - x, y = simulate_from_numpy_model(mod, rng, params, steps=123) - assert_pattern_repeats(y[:, 0], s, atol=ATOL, rtol=RTOL) - assert_pattern_repeats(y[:, 1], s, atol=ATOL, rtol=RTOL) + x, y = simulate_from_numpy_model(mod, rng, params, steps=123 * d) + assert_pattern_repeats(y[:, 0], s * d, atol=ATOL, rtol=RTOL) + assert_pattern_repeats(y[:, 1], s * d, atol=ATOL, rtol=RTOL) mod = mod.build(verbose=False) x0, *_, T, Z, R, _, Q = mod._unpack_statespace_with_placeholders() @@ -97,36 +104,38 @@ def test_time_seasonality_multiple_observed(rng, remove_first_state): params["sigma_season"] = np.array([0.1, 0.8], dtype=config.floatX) x0, T, Z, R, Q = fn(**params) + # Because the dimension of the observed states is 2, + # the expected T is the diagonal block matrix [[T0, 0], [0, T0]] + # where T0 is the transition matrix we would have if the + # seasonality were not multiple observed. + mod0 = st.TimeSeasonality(season_length=s, duration=d, remove_first_state=remove_first_state) + T0 = mod0.ssm["transition"].eval() + if remove_first_state: - expected_x0 = np.array([1.0, 0.0, 2.0, 0.0]) - - expected_T = np.array( - [ - [-1.0, -1.0, 0.0, 0.0], - [1.0, 0.0, 0.0, 0.0], - [0.0, 0.0, -1.0, -1.0], - [0.0, 0.0, 1.0, 0.0], - ] + expected_x0 = np.repeat(np.array([1.0, 0.0, 2.0, 0.0]), d) + expected_T = np.block( + [[T0, np.zeros((d * (s - 1), d * (s - 1)))], [np.zeros((d * (s - 1), d * (s - 1))), T0]] + ) + expected_R = np.array( + [[1.0, 1.0]] + [[0.0, 0.0]] * (2 * d - 1) + [[1.0, 1.0]] + [[0.0, 0.0]] * (2 * d - 1) ) - expected_R = np.array([[1.0, 1.0], [0.0, 0.0], [1.0, 1.0], [0.0, 0.0]]) - expected_Z = np.array([[1.0, 0.0, 0.0, 0.0], [0.0, 0.0, 1.0, 0.0]]) + Z0 = np.zeros((2, d * (s - 1))) + Z0[0, 0] = 1 + Z1 = np.zeros((2, d * (s - 1))) + Z1[1, 0] = 1 + expected_Z = np.block([[Z0, Z1]]) else: - expected_x0 = np.array([1.0, 0.0, 0.0, 2.0, 0.0, 0.0]) - expected_T = np.array( - [ - [0.0, 1.0, 0.0, 0.0, 0.0, 0.0], - [0.0, 0.0, 1.0, 0.0, 0.0, 0.0], - [1.0, 0.0, 0.0, 0.0, 0.0, 0.0], - [0.0, 0.0, 0.0, 0.0, 1.0, 0.0], - [0.0, 0.0, 0.0, 0.0, 0.0, 1.0], - [0.0, 0.0, 0.0, 1.0, 0.0, 0.0], - ] - ) + expected_x0 = np.repeat(np.array([1.0, 0.0, 0.0, 2.0, 0.0, 0.0]), d) + expected_T = np.block([[T0, np.zeros((s * d, s * d))], [np.zeros((s * d, s * d)), T0]]) expected_R = np.array( - [[1.0, 1.0], [0.0, 0.0], [0.0, 0.0], [1.0, 1.0], [0.0, 0.0], [0.0, 0.0]] + [[1.0, 1.0]] + [[0.0, 0.0]] * (s * d - 1) + [[1.0, 1.0]] + [[0.0, 0.0]] * (s * d - 1) ) - expected_Z = np.array([[1.0, 0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0, 0.0, 0.0]]) + Z0 = np.zeros((2, s * d)) + Z0[0, 0] = 1 + Z1 = np.zeros((2, s * d)) + Z1[1, 0] = 1 + expected_Z = np.block([[Z0, Z1]]) expected_Q = np.array([[0.1**2, 0.0], [0.0, 0.8**2]]) @@ -137,20 +146,23 @@ def test_time_seasonality_multiple_observed(rng, remove_first_state): np.testing.assert_allclose(matrix, expected) -def test_add_two_time_seasonality_different_observed(rng): +@pytest.mark.parametrize("d1, d2", [(1, 1), (1, 3), (3, 1), (3, 3)]) +def test_add_two_time_seasonality_different_observed(rng, d1, d2): mod1 = st.TimeSeasonality( season_length=3, + duration=d1, innovations=True, name="season1", - state_names=[f"state_{i}" for i in range(3)], + state_names=[f"state_{i}_{j}" for i in range(3) for j in range(d1)], observed_state_names=["data_1"], remove_first_state=False, ) mod2 = st.TimeSeasonality( season_length=5, + duration=d2, innovations=True, name="season2", - state_names=[f"state_{i}" for i in range(5)], + state_names=[f"state_{i}_{j}" for i in range(5) for j in range(d2)], observed_state_names=["data_2"], ) @@ -164,18 +176,22 @@ def test_add_two_time_seasonality_different_observed(rng): "initial_state_cov": np.eye(mod.k_states, dtype=config.floatX), } - x, y = simulate_from_numpy_model(mod, rng, params, steps=3 * 5 * 5) - assert_pattern_repeats(y[:, 0], 3, atol=ATOL, rtol=RTOL) - assert_pattern_repeats(y[:, 1], 5, atol=ATOL, rtol=RTOL) + x, y = simulate_from_numpy_model(mod, rng, params, steps=3 * 5 * 5 * d1 * d2) + assert_pattern_repeats(y[:, 0], 3 * d1, atol=ATOL, rtol=RTOL) + assert_pattern_repeats(y[:, 1], 5 * d2, atol=ATOL, rtol=RTOL) assert mod.state_names == [ - "state_0[data_1]", - "state_1[data_1]", - "state_2[data_1]", - "state_1[data_2]", - "state_2[data_2]", - "state_3[data_2]", - "state_4[data_2]", + item + for sublist in [ + [f"state_0_{j}[data_1]" for j in range(d1)], + [f"state_1_{j}[data_1]" for j in range(d1)], + [f"state_2_{j}[data_1]" for j in range(d1)], + [f"state_1_{j}[data_2]" for j in range(d2)], + [f"state_2_{j}[data_2]" for j in range(d2)], + [f"state_3_{j}[data_2]" for j in range(d2)], + [f"state_4_{j}[data_2]" for j in range(d2)], + ] + for item in sublist ] assert mod.shock_names == ["season1[data_1]", "season2[data_2]"] @@ -194,20 +210,19 @@ def test_add_two_time_seasonality_different_observed(rng): ) np.testing.assert_allclose( - np.array([1.0, 0.0, 0.0, 3.0, 0.0, 0.0, 1.2]), x0, atol=ATOL, rtol=RTOL + np.repeat(np.array([1.0, 0.0, 0.0, 3.0, 0.0, 0.0, 1.2]), [d1, d1, d1, d2, d2, d2, d2]), + x0, + atol=ATOL, + rtol=RTOL, ) + # The transition matrix T of mod is expected to be [[T1, 0], [0, T2]], + # where T1 and T2 are the transition matrices of mod1 and mod2, respectively. + T1 = mod1.ssm["transition"].eval() + T2 = mod2.ssm["transition"].eval() np.testing.assert_allclose( - np.array( - [ - [0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0], - [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0], - [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - [0.0, 0.0, 0.0, -1.0, -1.0, -1.0, -1.0], - [0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0], - [0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0], - [0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0], - ] + np.block( + [[T1, np.zeros((T1.shape[0], T2.shape[1]))], [np.zeros((T2.shape[0], T1.shape[1])), T2]] ), T, atol=ATOL,