diff --git a/examples/cuda/vector_add_custom_strategy.py b/examples/cuda/vector_add_custom_strategy.py new file mode 100644 index 000000000..29d873d5d --- /dev/null +++ b/examples/cuda/vector_add_custom_strategy.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +"""This is the minimal example from the README""" + +import numpy +import kernel_tuner +from kernel_tuner import tune_kernel +from kernel_tuner.file_utils import store_output_file, store_metadata_file + +def tune(): + + kernel_string = """ + __global__ void vector_add(float *c, float *a, float *b, int n) { + int i = blockIdx.x * block_size_x + threadIdx.x; + if (i tuple[tuple, float]: + """Optimize the black box function `func` within the given `searchspace`. + + Args: + func (CostFunc): Cost function to be optimized. Has a property `budget_spent_fraction` that indicates how much of the budget has been spent. + searchspace (Searchspace): Search space containing the parameters to be optimized. + + Returns: + tuple[tuple, float]: tuple of the best parameters and the corresponding cost value + """ + pass + + +class OptAlgWrapper: + """Wrapper class for user-defined optimization algorithms""" + + def __init__(self, optimizer: OptAlg): + self.optimizer: OptAlg = optimizer + + def tune(self, searchspace: Searchspace, runner, tuning_options): + cost_func = CostFunc(searchspace, tuning_options, runner, **self.optimizer.costfunc_kwargs) + + if self.optimizer.costfunc_kwargs.get('scaling', True): + # Initialize costfunc for scaling + cost_func.get_bounds_x0_eps() + + try: + self.optimizer(cost_func, searchspace) + except util.StopCriterionReached as e: + if tuning_options.verbose: + print(e) + + return cost_func.results diff --git a/kernel_tuner/util.py b/kernel_tuner/util.py index 072cce433..7713adbde 100644 --- a/kernel_tuner/util.py +++ b/kernel_tuner/util.py @@ -189,12 +189,28 @@ def check_argument_list(kernel_name, kernel_string, args): warnings.warn(errors[0], UserWarning) -def check_stop_criterion(to): - """Checks if max_fevals is reached or time limit is exceeded.""" - if "max_fevals" in to and len(to.unique_results) >= to.max_fevals: - raise StopCriterionReached("max_fevals reached") - if "time_limit" in to and (((time.perf_counter() - to.start_time) + (to.simulated_time * 1e-3)) > to.time_limit): - raise StopCriterionReached("time limit exceeded") +def check_stop_criterion(to: dict) -> float: + """Check if the stop criterion is reached. + + Args: + to (dict): tuning options. + + Raises: + StopCriterionReached: if the max_fevals is reached or time limit is exceeded. + + Returns: + float: fraction of budget spent. + """ + if "max_fevals" in to: + if len(to.unique_results) >= to.max_fevals: + raise StopCriterionReached(f"max_fevals ({to.max_fevals}) reached") + return len(to.unique_results) / to.max_fevals + if "time_limit" in to: + time_spent = (time.perf_counter() - to.start_time) + (to.simulated_time * 1e-3) + if time_spent > to.time_limit: + raise StopCriterionReached("time limit exceeded") + return time_spent / to.time_limit + def check_tune_params_list(tune_params, observers, simulation_mode=False): diff --git a/test/test_custom_optimizer.py b/test/test_custom_optimizer.py new file mode 100644 index 000000000..cfc136d3c --- /dev/null +++ b/test/test_custom_optimizer.py @@ -0,0 +1,148 @@ + +### The following was generating using the LLaMEA prompt and OpenAI o1 + +import numpy as np + +from kernel_tuner.strategies.wrapper import OptAlg + +class HybridDELocalRefinement(OptAlg): + """ + A two-phase differential evolution with local refinement, intended for BBOB-type + black box optimization problems in [-5,5]^dim. + + One-line idea: A two-phase hybrid DE with local refinement that balances global + exploration and local exploitation under a strict function evaluation budget. + """ + + def __init__(self): + super().__init__() + # You can adjust these hyperparameters based on experimentation/tuning: + self.F = 0.8 # Differential weight + self.CR = 0.9 # Crossover probability + self.local_search_freq = 10 # Local refinement frequency in generations + + def __call__(self, func, searchspace): + """ + Optimize the black box function `func` in [-5,5]^dim, using + at most self.budget function evaluations. + + Returns: + best_params: np.ndarray representing the best parameters found + best_value: float representing the best objective value found + """ + self.dim = searchspace.num_params + self.population_size = round(min(min(50, 10 * self.dim), np.ceil(searchspace.size / 3))) # Caps for extremely large dim + + # 1. Initialize population + lower_bound, upper_bound = -5.0, 5.0 + pop = np.random.uniform(lower_bound, upper_bound, (self.population_size, self.dim)) + + # Evaluate initial population + evaluations = 0 + fitness = np.empty(self.population_size) + for i in range(self.population_size): + fitness[i] = func(pop[i]) + evaluations += 1 + + # Track best solution + best_idx = np.argmin(fitness) + best_params = pop[best_idx].copy() + best_value = fitness[best_idx] + + # 2. Main evolutionary loop + gen = 0 + while func.budget_spent_fraction < 1.0 and evaluations < searchspace.size: + gen += 1 + for i in range(self.population_size): + # DE mutation: pick three distinct indices + idxs = np.random.choice(self.population_size, 3, replace=False) + a, b, c = pop[idxs] + mutant = a + self.F * (b - c) + + # Crossover + trial = np.copy(pop[i]) + crossover_points = np.random.rand(self.dim) < self.CR + trial[crossover_points] = mutant[crossover_points] + + # Enforce bounds + trial = np.clip(trial, lower_bound, upper_bound) + + # Evaluate trial + trial_fitness = func(trial) + evaluations += 1 + if func.budget_spent_fraction > 1.0: + # If out of budget, wrap up + if trial_fitness < fitness[i]: + pop[i] = trial + fitness[i] = trial_fitness + # Update global best + if trial_fitness < best_value: + best_value = trial_fitness + best_params = trial.copy() + break + + # Selection + if trial_fitness < fitness[i]: + pop[i] = trial + fitness[i] = trial_fitness + # Update global best + if trial_fitness < best_value: + best_value = trial_fitness + best_params = trial.copy() + + # Periodically refine best solution with a small local neighborhood search + if gen % self.local_search_freq == 0 and func.budget_spent_fraction < 1.0: + best_params, best_value, evaluations = self._local_refinement( + func, best_params, best_value, evaluations, lower_bound, upper_bound + ) + + return best_params, best_value + + def _local_refinement(self, func, best_params, best_value, evaluations, lb, ub): + """ + Local refinement around the best solution found so far. + Uses a quick 'perturb-and-accept' approach in a shrinking neighborhood. + """ + # Neighborhood size shrinks as the budget is consumed + step_size = 0.2 * (1.0 - func.budget_spent_fraction) + + for _ in range(5): # 5 refinements each time + if func.budget_spent_fraction >= 1.0: + break + candidate = best_params + np.random.uniform(-step_size, step_size, self.dim) + candidate = np.clip(candidate, lb, ub) + cand_value = func(candidate) + evaluations += 1 + if cand_value < best_value: + best_value = cand_value + best_params = candidate.copy() + + return best_params, best_value, evaluations + + + + +### Testing the Optimization Algorithm Wrapper in Kernel Tuner +import os +from kernel_tuner import tune_kernel +from kernel_tuner.strategies.wrapper import OptAlgWrapper + +from .test_runners import env + +cache_filename = os.path.dirname(os.path.realpath(__file__)) + "/test_cache_file.json" + +def test_OptAlgWrapper(env): + kernel_name, kernel_string, size, args, tune_params = env + + # Instantiate LLaMAE optimization algorithm + optimizer = HybridDELocalRefinement() + + # Wrap the algorithm class in the OptAlgWrapper + # for use in Kernel Tuner + strategy = OptAlgWrapper(optimizer) + strategy_options = { 'max_fevals': 15 } + + # Call the tuner + tune_kernel(kernel_name, kernel_string, size, args, tune_params, + strategy=strategy, strategy_options=strategy_options, cache=cache_filename, + simulation_mode=True, verbose=True)