Skip to content

Conversation

@fkiraly
Copy link
Collaborator

@fkiraly fkiraly commented Aug 24, 2025

This PR adds a SktimeForecastingTask, which defines a full benchmarking run for a forecaster that is passed later in _evaluate.

This object could be used as a "task" in the sktime ForecastingBenchmark.

Draft for discussion and reviewing the design:

  • it is quite similar to and partially duplicative with SktimeForecastingExperiment which is used in tuning. How should we deal with the similarity and intersection?
    • we could merge into a single class, depending on whether forecaster gets passed or not. Not sure where that leads though
  • is this a possible 1:1 dropin (or almost) for the task object in sktime?

@fkiraly fkiraly added the enhancement New feature or request label Aug 24, 2025
Copy link

@arnavk23 arnavk23 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some corrections to your file here -

# copyright: hyperactive developers, MIT License (see LICENSE file)

import numpy as np

from hyperactive.base import BaseExperiment


class SktimeForecastingTask(BaseExperiment):
    """Experiment adapter for forecast backtesting benchmark run.

    This class is used to perform backtesting experiments using a given
    sktime forecaster. It allows for hyperparameter tuning and evaluation of
    the model's performance.

    The score returned is the summary backtesting score,
    of applying ``sktime`` ``evaluate`` to an estimator passed as ``forecaster``
    in the ``score`` ``params``.

    The backtesting performed is specified by the ``cv`` parameter,
    and the scoring metric is specified by the ``scoring`` parameter.
    The ``X`` and ``y`` parameters are the input data and target values,
    which are used in fit/predict cross-validation.

    Differs from ``SktimeForecastingExperiment`` in that ``forecaster``
    is passed as a parameter directly to ``score`` and not to ``__init__``.
    """

    _tags = {
        "authors": "fkiraly",
        "maintainers": "fkiraly",
        "python_dependencies": "sktime",  # python dependencies
    }

    def __init__(
        self,
        cv,
        y,
        X=None,
        strategy="refit",
        scoring=None,
        error_score=np.nan,
        cv_X=None,
        backend=None,
        backend_params=None,
    ):
        self.X = X
        self.y = y
        self.strategy = strategy
        self.scoring = scoring
        self.cv = cv
        self.error_score = error_score
        self.cv_X = cv_X
        self.backend = backend
        self.backend_params = backend_params

        super().__init__()

        if scoring is None:
            from sktime.performance_metrics.forecasting import (
                MeanAbsolutePercentageError,
            )

            self._scoring = MeanAbsolutePercentageError(symmetric=True)
        else:
            self._scoring = scoring

        # Set a boolean tag indicating whether higher is better.
        # If the metric indicates lower_is_better, set False; otherwise True.
        try:
            lower_is_better = (
                True
                if scoring is None
                else bool(self._scoring.get_tag("lower_is_better", False))
            )
        except Exception:
            # If metric doesn't expose get_tag, default to False (lower is better)
            lower_is_better = True if scoring is None else False

        higher_is_better = not lower_is_better
        # Use a conventional boolean tag for the rest of the codebase
        try:
            self.set_tags(**{"higher_is_better": higher_is_better})
        except Exception:
            # If set_tags is not available or fails, ignore tagging but continue.
            pass

    def _paramnames(self):
        """Return the parameter names of the search."""
        return ["forecaster"]

    def _evaluate(self, params):
        """Evaluate the parameters.

        Parameters
        ----------
        params : dict with string keys
            Parameters to evaluate.

        Returns
        -------
        float
            The value of the parameters as per evaluation.
        dict
            Additional metadata about the search.
        """
        from sktime.forecasting.model_evaluation import evaluate

        forecaster = params.get("forecaster", None)
        if forecaster is None:
            raise ValueError("SktimeForecastingTask._evaluate requires params to include a 'forecaster' entry")

        try:
            results = evaluate(
                forecaster,
                cv=self.cv,
                y=self.y,
                X=self.X,
                strategy=self.strategy,
                scoring=self._scoring,
                error_score=self.error_score,
                cv_X=self.cv_X,
                backend=self.backend,
                backend_params=self.backend_params,
            )
        except Exception as e:
            # If user explicitly wants exceptions to propagate:
            if self.error_score == "raise":
                raise
            # Otherwise return error_score and capture the exception message
            return self.error_score, {"error": str(e)}

        # Determine scoring column name robustly
        scoring_name = getattr(self._scoring, "name", None) or self._scoring.__class__.__name__
        result_name = f"test_{scoring_name}"

        add_info = {"results": results}

        # Results handling robust to DataFrame-like or dict-like outputs
        try:
            # If results is a pandas DataFrame-like object:
            if hasattr(results, "columns"):
                if result_name in results.columns:
                    res_values = results[result_name]
                else:
                    # find a test_* column as fallback
                    test_cols = [c for c in results.columns if str(c).startswith("test_")]
                    if test_cols:
                        res_values = results[test_cols[0]]
                        add_info["warning"] = (
                            f"expected column '{result_name}' not found; using '{test_cols[0]}' instead"
                        )
                    else:
                        raise ValueError(f"No 'test_*' column found in evaluate results; expected '{result_name}'")
            else:
                # dict-like fallback
                if result_name in results:
                    res_values = results[result_name]
                else:
                    test_keys = [k for k in results.keys() if str(k).startswith("test_")]
                    if test_keys:
                        res_values = results[test_keys[0]]
                        add_info["warning"] = (
                            f"expected key '{result_name}' not found; using '{test_keys[0]}' instead"
                        )
                    else:
                        raise ValueError(f"No 'test_*' key found in evaluate results; expected '{result_name}'")
        except Exception as e:
            # Preserve original exception info
            if self.error_score == "raise":
                raise
            return self.error_score, {"error": str(e), **add_info}

        # Compute scalar summary result
        try:
            res_float = float(np.nanmean(res_values))
        except Exception:
            # Last-resort attempt: convert to numpy array and take mean
            try:
                res_float = float(np.nanmean(np.asarray(res_values)))
            except Exception as e:
                if self.error_score == "raise":
                    raise
                return self.error_score, {"error": f"Could not compute mean of results: {e}", **add_info}

        return res_float, add_info

    @classmethod
    def get_test_params(cls, parameter_set="default"):
        """Return testing parameter settings for the skbase object."""
        from sktime.datasets import load_airline, load_longley
        from sktime.split import ExpandingWindowSplitter

        y = load_airline()
        params0 = {
            "cv": ExpandingWindowSplitter(initial_window=36, step_length=12, fh=12),
            "y": y,
        }

        from sktime.performance_metrics.forecasting import MeanAbsolutePercentageError

        y, X = load_longley()
        params1 = {
            "cv": ExpandingWindowSplitter(initial_window=3, step_length=3, fh=1),
            "y": y,
            "X": X,
            "scoring": MeanAbsolutePercentageError(symmetric=False),
        }

        return [params0, params1]

    @classmethod
    def _get_score_params(cls):
        """Return settings for testing score/evaluate functions. Used in tests only."""
        from sktime.forecasting.naive import NaiveForecaster

        val0 = {"forecaster": NaiveForecaster(strategy="last")}
        val1 = {"forecaster": NaiveForecaster(strategy="last")}
        return [val0, val1]

@fkiraly
Copy link
Collaborator Author

fkiraly commented Nov 22, 2025

@arnavk23, can you kindly explain what you corrected and why?

@arnavk23
Copy link

arnavk23 commented Nov 22, 2025

@arnavk23, can you kindly explain what you corrected and why?

  1. Added validation for forecaster in params
    The original version assumed params["forecaster"] always existed.
    I added an explicit check and a clear error message because missing/incorrect parameters otherwise raise cryptic errors deeper inside sktime.evaluate.

  2. Made scoring metric handling more robust
    The previous code assumed that any scoring object implements get_tag("lower_is_better").
    I wrapped this in a try/except and added correct defaults for both cases (scoring=None or custom metrics).

  3. Safely applied higher_is_better tag
    set_tags() was called without handling the case where it fails or is not supported.

  4. Improved parsing of the output from sktime.evaluate()
    The previous implementation assumed: the result is always a DataFrame
    the scoring column name is always exactly "test_<scoring.name>"
    I added: support for both DataFrame-like and dict-like outputs fallback to the first available test_* column if the expected name isn’t present, warnings when fallback happens.

  5. Better error handling during evaluate
    Previously, any exception inside evaluate() could crash or create inconsistent behavior.
    Now: error_score="raise" preserves the expected behavior
    otherwise returns (error_score, {"error": })

  6. Robust conversion of results to a scalar
    The earlier implementation assumed you can always do float(results.mean()).
    I added: use of np.nanmean
    fallback to np.asarray if needed
    structured error reporting if even that fails

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants