Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 214 additions & 2 deletions src/hyperactive/integrations/sktime/_forecasting.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,24 @@ class ForecastingOptCV(_DelegatedForecaster):
- "logger_name": str, default="ray"; name of the logger to use.
- "mute_warnings": bool, default=False; if True, suppresses warnings

tune_by_instance : bool, optional (default=False)
Whether to tune parameter by each time series instance separately,
in case of Panel or Hierarchical data passed to the tuning estimator.
Only applies if time series passed are Panel or Hierarchical.
If True, clones of the forecaster will be fit to each instance separately,
and are available in fields of the ``forecasters_`` attribute.
Has the same effect as applying ForecastByLevel wrapper to self.
If False, the same best parameter is selected for all instances.

tune_by_variable : bool, optional (default=False)
Whether to tune parameter by each time series variable separately,
in case of multivariate data passed to the tuning estimator.
Only applies if time series passed are strictly multivariate.
If True, clones of the forecaster will be fit to each variable separately,
and are available in fields of the ``forecasters_`` attribute.
Has the same effect as applying ColumnEnsembleForecaster wrapper to self.
If False, the same best parameter is selected for all variables.

Example
-------
Any available tuning engine from hyperactive can be used, for example:
Expand Down Expand Up @@ -215,6 +233,8 @@ def __init__(
cv_X=None,
backend=None,
backend_params=None,
tune_by_instance=False,
tune_by_variable=False,
):
self.forecaster = forecaster
self.optimizer = optimizer
Expand All @@ -227,6 +247,8 @@ def __init__(
self.cv_X = cv_X
self.backend = backend
self.backend_params = backend_params
self.tune_by_instance = tune_by_instance
self.tune_by_variable = tune_by_variable
super().__init__()

def _fit(self, y, X, fh):
Expand All @@ -245,12 +267,27 @@ def _fit(self, y, X, fh):
-------
self : returns an instance of self.
"""
# Handle broadcasting options when requested and applicable
if self.tune_by_instance or self.tune_by_variable:
broadcasted = self._fit_with_broadcasting(y, X, fh)
if broadcasted:
return self

return self._fit_single(y, X, fh)

def _fit_single(self, y, X, fh):
"""Run the core fit logic without broadcasting shortcuts."""
import time

from sktime.utils.validation.forecasting import check_scoring

forecaster = self.forecaster.clone()

scoring = check_scoring(self.scoring, obj=self)
# scoring_name = f"test_{scoring.name}"
self.scorer_ = scoring

# Count number of CV splits
self.n_splits_ = self.cv.get_n_splits(y)

experiment = SktimeForecastingExperiment(
forecaster=forecaster,
Expand All @@ -272,9 +309,31 @@ def _fit(self, y, X, fh):
self.best_params_ = best_params
self.best_forecaster_ = forecaster.set_params(**best_params)

# Refit model with best parameters.
# Store cv_results from optimizer if available
if hasattr(optimizer, "results"):
self.cv_results_ = optimizer.results
else:
# Create a basic cv_results_ dict
self.cv_results_ = {"best_params": best_params}

# Store best_index_ and best_score_ if available from optimizer
if hasattr(optimizer, "best_score"):
self.best_score_ = optimizer.best_score
else:
# Calculate best score by evaluating best params
best_score, _ = experiment.score(best_params)
self.best_score_ = best_score

self.best_index_ = 0 # For single best result

# Refit model with best parameters and track time.
if self.refit:
start_time = time.time()
self.best_forecaster_.fit(y=y, X=X, fh=fh)
end_time = time.time()
self.refit_time_ = end_time - start_time
else:
self.refit_time_ = 0.0

return self

Expand Down Expand Up @@ -311,6 +370,159 @@ def _predict(self, fh, X):
)
return super()._predict(fh=fh, X=X)

def _fit_with_broadcasting(self, y, X, fh):
"""Fit with broadcasting options (tune_by_instance or tune_by_variable).

Parameters
----------
y : pd.Series or pd.DataFrame
Target time series to which to fit the forecaster.
X : pd.DataFrame, optional (default=None)
Exogenous variables
fh : int, list or np.array, optional (default=None)
The forecasters horizon with the steps ahead to to predict.

Returns
-------
bool
True if broadcasting was performed, False otherwise.
"""
import pandas as pd
from sktime.utils.validation.forecasting import check_scoring

scoring = check_scoring(self.scoring, obj=self)
self.scorer_ = scoring
self.n_splits_ = self.cv.get_n_splits(y)

# Determine if we need to broadcast
is_panel = "MultiIndex" in str(type(getattr(y, "index", None)))
is_multivariate = isinstance(y, pd.DataFrame) and len(y.columns) > 1

forecasters_list = []
refit_times = []
broadcast_handled = False

# Handle tune_by_instance for Panel/Hierarchical data
if self.tune_by_instance and is_panel:
broadcast_handled = True
# Get unique instances
if hasattr(y.index, "levels"):
instances = y.index.get_level_values(0).unique()
else:
instances = [0] # Single instance fallback

for instance in instances:
# Extract instance data
if hasattr(y.index, "levels"):
y_instance = y.loc[instance]
X_instance = X.loc[instance] if X is not None else None
else:
y_instance = y
X_instance = X

# Fit for this instance
tuner = type(self)(
forecaster=self.forecaster.clone(),
optimizer=self.optimizer.clone(),
cv=self.cv,
strategy=self.strategy,
update_behaviour=self.update_behaviour,
scoring=self.scoring,
refit=self.refit,
error_score=self.error_score,
cv_X=self.cv_X,
backend=self.backend,
backend_params=self.backend_params,
tune_by_instance=False,
tune_by_variable=self.tune_by_variable,
)
tuner.fit(y_instance, X=X_instance, fh=fh)

forecasters_list.append(
{
"instance": instance,
"forecaster": tuner.best_forecaster_,
"best_params": tuner.best_params_,
"best_score": tuner.best_score_,
"refit_time": getattr(tuner, "refit_time_", 0.0),
}
)
refit_times.append(getattr(tuner, "refit_time_", 0.0))

# Store as DataFrame
self.forecasters_ = pd.DataFrame(forecasters_list)

# Handle tune_by_variable for multivariate data
elif self.tune_by_variable and is_multivariate:
broadcast_handled = True
variables = y.columns

for variable in variables:
# Extract variable data
y_var = y[[variable]]
X_var = X if X is not None else None

# Fit for this variable
tuner = type(self)(
forecaster=self.forecaster.clone(),
optimizer=self.optimizer.clone(),
cv=self.cv,
strategy=self.strategy,
update_behaviour=self.update_behaviour,
scoring=self.scoring,
refit=self.refit,
error_score=self.error_score,
cv_X=self.cv_X,
backend=self.backend,
backend_params=self.backend_params,
tune_by_instance=False,
tune_by_variable=False,
)
tuner.fit(y_var, X=X_var, fh=fh)

forecasters_list.append(
{
"variable": variable,
"forecaster": tuner.best_forecaster_,
"best_params": tuner.best_params_,
"best_score": tuner.best_score_,
"refit_time": getattr(tuner, "refit_time_", 0.0),
}
)
refit_times.append(getattr(tuner, "refit_time_", 0.0))

# Store as DataFrame
self.forecasters_ = pd.DataFrame(forecasters_list)
else:
# If broadcasting was requested but not applicable, fall back to regular fit
return False

if not forecasters_list:
raise RuntimeError(
"Broadcasting was requested but no forecasters were fitted."
)

# Determine best forecaster based on available scores
scores = [entry.get("best_score") for entry in forecasters_list]
score_values = [np.inf if score is None else score for score in scores]
best_index = int(np.argmin(score_values))
best_entry = forecasters_list[best_index]

self.best_forecaster_ = best_entry["forecaster"]
self.best_params_ = best_entry["best_params"]
self.best_score_ = best_entry.get("best_score")
self.best_index_ = best_index

self.cv_results_ = {"forecasters": self.forecasters_}

# Aggregate refit times from each cloned tuner
if self.refit:
self.refit_time_ = float(np.sum(refit_times))
else:
self.refit_time_ = 0.0

return broadcast_handled

def _update(self, y, X=None, update_params=True):
"""Update time series to incremental training data.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@
import pytest
from skbase.utils.dependencies import _check_soft_dependencies

if _check_soft_dependencies("sktime", severity="none"):
SKTIME_AVAILABLE = _check_soft_dependencies("sktime", severity="none")

if SKTIME_AVAILABLE:
from hyperactive.integrations.sktime import ForecastingOptCV, TSCOptCV

EST_TO_TEST = [ForecastingOptCV, TSCOptCV]
else:
EST_TO_TEST = []

pytestmark = pytest.mark.skipif(
not SKTIME_AVAILABLE, reason="sktime soft dependency not available"
)


@pytest.mark.parametrize("estimator", EST_TO_TEST)
def test_sktime_estimator(estimator):
Expand All @@ -20,3 +26,32 @@ def test_sktime_estimator(estimator):
check_estimator(estimator, raise_exceptions=True)
# The above line collects all API conformance tests in sktime and runs them.
# It will raise an error if the estimator is not API conformant.


def test_tune_by_instance_fallback_when_not_panel():
"""Ensure tune_by_instance gracefully falls back for univariate data."""
import numpy as np
import pandas as pd
from sktime.forecasting.naive import NaiveForecaster
from sktime.split import SingleWindowSplitter

from hyperactive.opt.gridsearch import GridSearchSk

y = pd.Series(np.arange(24, dtype=float))
fh = [1]
splitter = SingleWindowSplitter(fh=fh, window_length=12)
optimizer = GridSearchSk(param_grid={"window_length": [2, 4]})

tuner = ForecastingOptCV(
forecaster=NaiveForecaster(strategy="last"),
optimizer=optimizer,
cv=splitter,
tune_by_instance=True,
)

tuned = tuner.fit(y, fh=fh)

assert isinstance(tuned.best_params_, dict)
assert tuned.best_index_ == 0
assert not hasattr(tuned, "forecasters_")
assert tuned.refit_time_ >= 0.0