diff --git a/package/samplers/distributionally_robust_bo/LICENSE b/package/samplers/distributionally_robust_bo/LICENSE new file mode 100644 index 00000000..8ecbb22e --- /dev/null +++ b/package/samplers/distributionally_robust_bo/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 Rishabh Dewangan + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/package/samplers/distributionally_robust_bo/README.md b/package/samplers/distributionally_robust_bo/README.md new file mode 100644 index 00000000..a6b0254f --- /dev/null +++ b/package/samplers/distributionally_robust_bo/README.md @@ -0,0 +1,61 @@ +--- +author: Rishabh Dewangan +title: Distributionally Robust BO Sampler +description: A sampler based on Distributionally Robust Bayesian Optimization for chance-constrained problems. +tags: [sampler] +optuna_versions: [3.6.1] +license: MIT License +--- + +## Abstract + +This package provides a sampler based on the algorithm proposed in **"Bayesian Optimization for Distributionally Robust Chance-constrained Problem"** (ICML 2022). + +Standard Bayesian Optimization assumes a known distribution of environmental variables. However, in many real-world scenarios (like manufacturing tolerances or financial markets), the true distribution is uncertain. This sampler uses a **Distributionally Robust Chance Constraint (DRCC)** to ensure that the objective function satisfies a specific threshold ($g(x, w) > h$) with at least a probability $\alpha$, even under the worst-case probability distribution within a defined ambiguity set. + +## APIs + +- `DistributionallyRobustSampler(*, epsilon_t: float = 0.15, h: float = 5.0, alpha: float = 0.53, seed: int | None = None)` + - `epsilon_t`: The radius of the ambiguity set. It defines how much the worst-case probability distribution can deviate from the reference distribution. A higher value means a more robust (but more conservative) optimization. + - `h`: The threshold value for the chance constraint. + - `alpha`: The minimum required probability that the function value will exceed the threshold `h`. + - `seed`: Seed for the random number generator, used primarily during the initial exploration phase. + +## Installation + +To use this sampler, ensure you have the following dependencies installed: + +```shell +$ pip install scipy numpy +``` + +## Example + +```python +import optuna +import optunahub + +def objective(trial: optuna.Trial) -> float: + # A simple 2D objective function + x = trial.suggest_float("x", -5.0, 5.0) + y = trial.suggest_float("y", -5.0, 5.0) + return x**2 + y**2 + +# Load the sampler from OptunaHub +sampler = optunahub.load_module( + package="samplers/distributionally_robust_bo" +).DistributionallyRobustSampler( + epsilon_t=0.15, + h=5.0, + alpha=0.53 +) + +study = optuna.create_study(sampler=sampler, direction="minimize") +study.optimize(objective, n_trials=30) + +print(study.best_trials) +``` + +## Reference + +Hideaki Imamura, et al. "Bayesian Optimization for Distributionally Robust Chance-constrained Problem." Proceedings of the 39th International Conference on Machine Learning (ICML). 2022. diff --git a/package/samplers/distributionally_robust_bo/__init__.py b/package/samplers/distributionally_robust_bo/__init__.py new file mode 100644 index 00000000..3f09c77e --- /dev/null +++ b/package/samplers/distributionally_robust_bo/__init__.py @@ -0,0 +1,4 @@ +from .sampler import DistributionallyRobustSampler + + +__all__ = ["DistributionallyRobustSampler"] diff --git a/package/samplers/distributionally_robust_bo/example.py b/package/samplers/distributionally_robust_bo/example.py new file mode 100644 index 00000000..442bc6be --- /dev/null +++ b/package/samplers/distributionally_robust_bo/example.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +import optuna +import optunahub + + +def objective(trial: optuna.Trial) -> float: + x = trial.suggest_float("x", -5.0, 5.0) + y = trial.suggest_float("y", -5.0, 5.0) + return x**2 + y**2 + + +if __name__ == "__main__": + # Load the distributionally robust sampler from OptunaHub + sampler = optunahub.load_module( + package="samplers/distributionally_robust_bo" + ).DistributionallyRobustSampler(epsilon_t=0.15, h=5.0, alpha=0.53) + + study = optuna.create_study(sampler=sampler, direction="minimize") + study.optimize(objective, n_trials=30) + + print("\nBest trials:") + print(study.best_trials) diff --git a/package/samplers/distributionally_robust_bo/requirements.txt b/package/samplers/distributionally_robust_bo/requirements.txt new file mode 100644 index 00000000..415cdb13 --- /dev/null +++ b/package/samplers/distributionally_robust_bo/requirements.txt @@ -0,0 +1,2 @@ +scipy +numpy \ No newline at end of file diff --git a/package/samplers/distributionally_robust_bo/sampler.py b/package/samplers/distributionally_robust_bo/sampler.py new file mode 100644 index 00000000..2a468eb4 --- /dev/null +++ b/package/samplers/distributionally_robust_bo/sampler.py @@ -0,0 +1,203 @@ +from typing import Any +from typing import Dict +from typing import Optional + +import numpy as np +import numpy.typing as npt +import optuna +from optuna.distributions import BaseDistribution +from optuna.samplers import BaseSampler +from optuna.trial import TrialState +from scipy.optimize import linprog + + +def _compute_worst_case_probability( + indicator_above_h: npt.NDArray[np.float64], + ref_p: npt.NDArray[np.float64], + epsilon_t: float, +) -> float: + """Computes the worst-case probability for the chance constraint. + + Identifies the worst-case probability distribution within an ambiguity set + defined by epsilon_t using a linear program. + """ + grid_num = len(ref_p) + + # Variables structured as duals: [p_1^+, p_1^-, p_2^+, p_2^-, ..., p_n^+, p_n^-] + c = np.zeros(2 * grid_num) + c[0::2] = indicator_above_h + c[1::2] = -indicator_above_h + + a_eq = np.zeros((1, 2 * grid_num)) + a_eq[0, 0::2] = 1.0 + a_eq[0, 1::2] = -1.0 + b_eq = np.array([0.0]) + + d_mat = -np.eye(2 * grid_num) + b_d = np.zeros(2 * grid_num) + + f2_mat = np.ones((1, 2 * grid_num)) + b_f2 = np.array([epsilon_t]) + + f3_base = np.zeros((grid_num, 2 * grid_num)) + for i in range(grid_num): + f3_base[i, 2 * i] = 1.0 + f3_base[i, 2 * i + 1] = -1.0 + + # Flipping >= constraints to <= for scipy.optimize.linprog compatibility + a_f3_1 = -f3_base + b_f3_1 = -(np.ones(grid_num) - ref_p) + + a_f3_2 = f3_base + b_f3_2 = ref_p + + a_ub = np.vstack([d_mat, f2_mat, a_f3_1, a_f3_2]) + b_ub = np.concatenate([b_d, b_f2, b_f3_1, b_f3_2]) + + res = linprog(c, A_ub=a_ub, b_ub=b_ub, A_eq=a_eq, b_eq=b_eq, method="highs") + + if res.success: + ref_p_above = float(np.dot(indicator_above_h, ref_p)) + return float(res.fun + ref_p_above) + + return 0.0 + + +class DistributionallyRobustSampler(BaseSampler): + """A sampler based on Distributionally Robust Bayesian Optimization. + + Implements the algorithm proposed in "Bayesian Optimization for + Distributionally Robust Chance-constrained Problem" (ICML 2022). + """ + + def __init__( + self, + epsilon_t: float = 0.15, + h: float = 5.0, + alpha: float = 0.53, + seed: Optional[int] = None, + ) -> None: + self._epsilon_t = epsilon_t + self._h = h + self._alpha = alpha + self._rng = np.random.RandomState(seed) + self._random_sampler = optuna.samplers.RandomSampler(seed=seed) + + def infer_relative_search_space( + self, study: optuna.Study, trial: optuna.trial.FrozenTrial + ) -> Dict[str, BaseDistribution]: + return {} + + def sample_relative( + self, + study: optuna.Study, + trial: optuna.trial.FrozenTrial, + search_space: Dict[str, BaseDistribution], + ) -> Dict[str, Any]: + return {} + + def sample_independent( + self, + study: optuna.Study, + trial: optuna.trial.FrozenTrial, + param_name: str, + param_distribution: BaseDistribution, + ) -> Any: + completed_trials = study.get_trials(deepcopy=False, states=(TrialState.COMPLETE,)) + + x_train = [] + y_train = [] + for t in completed_trials: + if param_name in t.params and t.value is not None: + x_train.append(t.params[param_name]) + y_train.append(t.value) + + if len(x_train) < 5: + return self._random_sampler.sample_independent( + study, trial, param_name, param_distribution + ) + + x_train_np = np.array(x_train).reshape(-1, 1) + y_train_np = np.array(y_train) + + # Standardize targets to prevent matrix singularity + y_mean = np.mean(y_train_np) + y_std = np.std(y_train_np) + 1e-8 + y_scaled = (y_train_np - y_mean) / y_std + + # RBF Kernel with dynamic length scale + length_scale = np.std(x_train_np) + 1e-8 + sigma_f = 1.0 + sigma_n = 1e-4 + + def rbf_kernel(x1: np.ndarray, x2: np.ndarray) -> np.ndarray: + sqdist = np.sum(x1**2, 1).reshape(-1, 1) + np.sum(x2**2, 1) - 2 * np.dot(x1, x2.T) + return sigma_f * np.exp(-0.5 * sqdist / (length_scale**2)) + + k_mat = rbf_kernel(x_train_np, x_train_np) + sigma_n * np.eye(len(x_train_np)) + try: + k_inv = np.linalg.inv(k_mat) + except np.linalg.LinAlgError: + return self._random_sampler.sample_independent( + study, trial, param_name, param_distribution + ) + + if isinstance(param_distribution, optuna.distributions.FloatDistribution): + candidates = np.linspace(param_distribution.low, param_distribution.high, 50).reshape( + -1, 1 + ) + elif isinstance(param_distribution, optuna.distributions.IntDistribution): + candidates = np.arange(param_distribution.low, param_distribution.high + 1).reshape( + -1, 1 + ) + else: + return self._random_sampler.sample_independent( + study, trial, param_name, param_distribution + ) + + k_star = rbf_kernel(candidates, x_train_np) + k_star_star = rbf_kernel(candidates, candidates) + + post_mean_scaled = k_star @ k_inv @ y_scaled + post_mean = post_mean_scaled * y_std + y_mean + + post_var_scaled = np.diag(k_star_star - k_star @ k_inv @ k_star.T) + post_var = np.clip(post_var_scaled, 1e-8, None) * (y_std**2) + post_std = np.sqrt(post_var) + + beta = 2.0 + lcb = post_mean - beta * post_std + ucb = post_mean + beta * post_std + + grid_num = len(candidates) + ref_p = np.ones(grid_num) / grid_num + + lcb_above_h = (lcb > self._h).astype(np.float64) + ucb_above_h = (ucb > self._h).astype(np.float64) + + dist_ptr_lcb = _compute_worst_case_probability(lcb_above_h, ref_p, self._epsilon_t) + dist_ptr_ucb = _compute_worst_case_probability(ucb_above_h, ref_p, self._epsilon_t) + + # AF_num == 6 from ICML 2022 Proposed Algorithm + af_score = np.zeros(grid_num) + for i in range(grid_num): + if dist_ptr_lcb >= self._alpha: + af_score[i] = lcb[i] + else: + af_score[i] = ucb[i] - 1e5 * max(0, self._alpha - dist_ptr_ucb) + + # Inject micro-jitter to prevent AF collapse on identical scores + jitter = self._rng.normal(0, 1e-4, size=grid_num) + best_idx = int(np.argmax(af_score + jitter)) + best_candidate = candidates[best_idx][0] + + # Force exploration if GP gets stuck on a recent point + recent_values = x_train[-5:] if len(x_train) >= 5 else x_train + if best_candidate in recent_values: + top_k_indices = np.argsort(af_score)[-max(1, grid_num // 10) :] + best_idx = self._rng.choice(top_k_indices) + best_candidate = candidates[best_idx][0] + + if isinstance(param_distribution, optuna.distributions.IntDistribution): + return int(best_candidate) + return float(best_candidate)