diff --git a/examples/optimization/conditional_parameters.py b/examples/optimization/conditional_parameters.py index eaee9a1e..c82096b1 100644 --- a/examples/optimization/conditional_parameters.py +++ b/examples/optimization/conditional_parameters.py @@ -18,6 +18,7 @@ train_data, test_data = train_test_split(data, test_size=0.2, stratify=data["TARGET"], random_state=42) +# replacing default _sample function in OptunaTuner class with this function def sample(optimization_search_space, trial, suggested_params): trial_values = copy.copy(suggested_params) trial_values["feature_fraction"] = trial.suggest_uniform("feature_fraction", low=0.5, high=1.0) diff --git a/examples/optimization/custom_search_space.py b/examples/optimization/custom_search_space.py index 6cd5a058..8e9827a9 100644 --- a/examples/optimization/custom_search_space.py +++ b/examples/optimization/custom_search_space.py @@ -8,9 +8,8 @@ from sklearn.model_selection import train_test_split from lightautoml.automl.presets.tabular_presets import TabularAutoML -from lightautoml.ml_algo.tuning.base import Distribution -from lightautoml.ml_algo.tuning.base import SearchSpace from lightautoml.tasks import Task +from lightautoml.ml_algo.tuning.base import Uniform # load and prepare data @@ -22,8 +21,8 @@ task=Task("binary"), lgb_params={ "optimization_search_space": { - "feature_fraction": SearchSpace(Distribution.UNIFORM, low=0.5, high=1.0), - "min_sum_hessian_in_leaf": SearchSpace(Distribution.LOGUNIFORM, low=1e-3, high=10.0), + "feature_fraction": Uniform(low=0.5, high=1.0), + "min_sum_hessian_in_leaf": Uniform(low=1e-3, high=10.0, log=True), } }, ) diff --git a/examples/optimization/sequential_parameter_search.py b/examples/optimization/sequential_parameter_search.py index cf6e4693..54a313b9 100644 --- a/examples/optimization/sequential_parameter_search.py +++ b/examples/optimization/sequential_parameter_search.py @@ -12,13 +12,12 @@ from lightautoml.automl.presets.tabular_presets import TabularAutoML from lightautoml.tasks import Task - # load and prepare data data = pd.read_csv("./data/sampled_app_train.csv") train_data, test_data = train_test_split(data, test_size=0.2, stratify=data["TARGET"], random_state=42) -def sample(optimization_search_space, trial, suggested_params): +def sample(trial, suggested_params): trial_values = copy.copy(suggested_params) for feature_fraction in range(10): diff --git a/lightautoml/automl/presets/tabular_presets.py b/lightautoml/automl/presets/tabular_presets.py index 092b870e..a001434f 100755 --- a/lightautoml/automl/presets/tabular_presets.py +++ b/lightautoml/automl/presets/tabular_presets.py @@ -30,7 +30,6 @@ from ...ml_algo.dl_model import TorchModel from ...ml_algo.linear_sklearn import LinearLBFGS from ...ml_algo.random_forest import RandomForestSklearn -from ...ml_algo.tuning.optuna import DLOptunaTuner from ...ml_algo.tuning.optuna import OptunaTuner from ...pipelines.features.lgb_pipeline import LGBAdvancedPipeline from ...pipelines.features.lgb_pipeline import LGBSeqSimpleFeatures @@ -444,7 +443,7 @@ def get_nn( if tuned: nn_model.set_prefix("Tuned") - nn_tuner = DLOptunaTuner( + nn_tuner = OptunaTuner( n_trials=model_params["tuning_params"]["max_tuning_iter"], timeout=model_params["tuning_params"]["max_tuning_time"], fit_on_holdout=model_params["tuning_params"]["fit_on_holdout"], diff --git a/lightautoml/automl/presets/text_presets.py b/lightautoml/automl/presets/text_presets.py index d79fdbb4..4458ab1d 100755 --- a/lightautoml/automl/presets/text_presets.py +++ b/lightautoml/automl/presets/text_presets.py @@ -22,7 +22,6 @@ from ...ml_algo.boost_lgbm import BoostLGBM from ...ml_algo.dl_model import TorchModel from ...ml_algo.linear_sklearn import LinearLBFGS -from ...ml_algo.tuning.optuna import DLOptunaTuner from ...ml_algo.tuning.optuna import OptunaTuner from ...pipelines.features.base import FeaturesPipeline from ...pipelines.features.lgb_pipeline import LGBAdvancedPipeline @@ -307,7 +306,7 @@ def get_nn( if tuned: nn_model.set_prefix("Tuned") - nn_tuner = DLOptunaTuner( + nn_tuner = OptunaTuner( n_trials=model_params["tuning_params"]["max_tuning_iter"], timeout=model_params["tuning_params"]["max_tuning_time"], fit_on_holdout=model_params["tuning_params"]["fit_on_holdout"], diff --git a/lightautoml/ml_algo/base.py b/lightautoml/ml_algo/base.py index 5114d481..e50a80e4 100755 --- a/lightautoml/ml_algo/base.py +++ b/lightautoml/ml_algo/base.py @@ -8,6 +8,7 @@ from typing import Any from typing import Dict from typing import List +from typing import Callable from typing import Optional from typing import Sequence from typing import Tuple @@ -48,7 +49,13 @@ class MLAlgo(ABC): """ _default_params: Dict = {} - optimization_search_space: Dict = {} + + # Dict is a default search space representation that is used for simple cases + # Callable is used for complex cases like conditional search space as described in + # LightAutoML/examples/optimization/conditional_parameters.py + # Called in _get_objective function in OptunaTuner class + optimization_search_space: Union[Dict, Callable] = {} + # TODO: add checks here _fit_checks: Tuple = () _transform_checks: Tuple = () diff --git a/lightautoml/ml_algo/dl_model.py b/lightautoml/ml_algo/dl_model.py index e384d6bd..e6b5a63b 100644 --- a/lightautoml/ml_algo/dl_model.py +++ b/lightautoml/ml_algo/dl_model.py @@ -17,13 +17,13 @@ from typing import Dict import numpy as np -import optuna import pandas as pd import torch import torch.nn as nn from torch.optim.lr_scheduler import ReduceLROnPlateau +from .tuning.base import Uniform from ..dataset.np_pd_dataset import NumpyDataset from ..tasks.losses.torch import TorchLossWrapper from ..utils.installation import __validate_extra_deps @@ -618,7 +618,7 @@ def predict_single_fold(self, model: any, dataset: TabularDataset) -> np.ndarray return pred - def _default_sample(self, trial: optuna.trial.Trial, estimated_n_trials: int, suggested_params: Dict) -> Dict: + def _get_default_search_spaces(self, estimated_n_trials: int, suggested_params: Dict) -> Dict: """Implements simple tuning sampling strategy. Args: @@ -631,19 +631,18 @@ def _default_sample(self, trial: optuna.trial.Trial, estimated_n_trials: int, su """ # optionally - trial_values = copy(suggested_params) + # trial_values = copy(suggested_params) # TODO: check how to use it + trial_values = {} - trial_values["bs"] = trial.suggest_categorical("bs", [2 ** i for i in range(6, 11)]) + trial_values["bs"] = Uniform(low=64, high=1024, q=1) - weight_decay_bin = trial.suggest_categorical("weight_decay_bin", [0, 1]) + weight_decay_bin = Uniform(low=0, high=1, q=1) if weight_decay_bin == 0: weight_decay = 0 else: - weight_decay = trial.suggest_loguniform("weight_decay", low=1e-6, high=1e-2) + weight_decay = Uniform(low=1e-6, high=1e-2, log=True) + + trial_values["lr"] = Uniform(low=1e-5, high=1e-1, log=True) + trial_values["weight_decay"] = weight_decay - lr = trial.suggest_loguniform("lr", low=1e-5, high=1e-1) - trial_values["opt_params"] = { - "lr": lr, - "weight_decay": weight_decay, - } return trial_values diff --git a/lightautoml/ml_algo/tuning/optuna.py b/lightautoml/ml_algo/tuning/optuna.py index 62e565df..031b065f 100644 --- a/lightautoml/ml_algo/tuning/optuna.py +++ b/lightautoml/ml_algo/tuning/optuna.py @@ -20,6 +20,7 @@ from .base import Uniform from ...validation.base import HoldoutIterator from ...validation.base import TrainValidIterator +from ...ml_algo.dl_model import TorchModel from ...utils.logging import get_stdout_level @@ -137,6 +138,7 @@ def fit( """ assert not ml_algo.is_fitted, "Fitted algo cannot be tuned." + # optuna.logging.set_verbosity(logger.getEffectiveLevel()) # upd timeout according to ml_algo timer estimated_tuning_time = ml_algo.timer.estimate_tuner_time(len(train_valid_iterator)) @@ -174,8 +176,20 @@ def update_trial_time(study: optuna.study.Study, trial: optuna.trial.FrozenTrial ) try: + self._is_nn = isinstance(ml_algo, TorchModel) + rows_num = train_valid_iterator.train.shape[0] + + # get num of cpu for a process + num_cpu_per_process, n_jobs = self.allocate_resources_for_optuna_jobs( + overall_num_cpu=ml_algo.params["num_threads"], rows_num=rows_num, is_nn=self._is_nn + ) + + ml_algo.default_params[ + "num_threads" # TODO: check if num_threads exist in every algo + ] = num_cpu_per_process # get's num of cpu here when makes params for optuna optimisation + # Custom progress bar - def custom_progress_bar(study: optuna.study.Study, trial: optuna.trial.FrozenTrial): + def custom_progress_bar(study: optuna.study.Study): best_trial = study.best_trial progress_bar.set_postfix(best_trial=best_trial.number, best_value=best_trial.value) progress_bar.update(1) @@ -195,6 +209,7 @@ def custom_progress_bar(study: optuna.study.Study, trial: optuna.trial.FrozenTri ), n_trials=self.n_trials, timeout=self.timeout, + n_jobs=n_jobs, callbacks=( [update_trial_time, custom_progress_bar] if get_stdout_level() in [logging.INFO, logging.INFO2] @@ -206,9 +221,7 @@ def custom_progress_bar(study: optuna.study.Study, trial: optuna.trial.FrozenTri if get_stdout_level() in [logging.INFO, logging.INFO2]: progress_bar.close() - # need to update best params here - self._best_params = self.study.best_params - ml_algo.params = self._best_params + ml_algo.params = self.study.best_params logger.info(f"Hyperparameters optimization for \x1b[1m{ml_algo._name}\x1b[0m completed") logger.info2( @@ -216,11 +229,16 @@ def custom_progress_bar(study: optuna.study.Study, trial: optuna.trial.FrozenTri ) if flg_new_iterator: + # set defatult_params back to normal + ml_algo.default_params["thread_count"] = ml_algo.params["thread_count"] # if tuner was fitted on holdout set we dont need to save train results return None, None preds_ds = ml_algo.fit_predict(train_valid_iterator) + # set defatult_params back to normal + ml_algo.default_params["thread_count"] = ml_algo.params["thread_count"] + return ml_algo, preds_ds except optuna.exceptions.OptunaError: return None, None @@ -247,7 +265,6 @@ def _get_objective( def objective(trial: optuna.trial.Trial) -> float: _ml_algo = deepcopy(ml_algo) - optimization_search_space = _ml_algo.optimization_search_space if not optimization_search_space: @@ -259,7 +276,7 @@ def objective(trial: optuna.trial.Trial) -> float: if callable(optimization_search_space): _ml_algo.params = optimization_search_space( trial=trial, - optimization_search_space=optimization_search_space, + optimization_search_space=None, suggested_params=_ml_algo.init_params_on_input(train_valid_iterator), ) else: @@ -271,7 +288,8 @@ def objective(trial: optuna.trial.Trial) -> float: output_dataset = _ml_algo.fit_predict(train_valid_iterator=train_valid_iterator) - return _ml_algo.score(output_dataset) + score = _ml_algo.score(output_dataset) + return score return objective @@ -298,236 +316,60 @@ def _sample( if not_supported: raise ValueError(f"Optuna does not support distribution {search_space}") + if self._is_nn: + trial_values["opt_params"] = { + "lr": trial_values["lr"], + "weight_decay": trial_values["weight_decay"], + } + return trial_values def plot(self): """Plot optimization history of all trials in a study.""" return optuna.visualization.plot_optimization_history(self.study) - -class DLOptunaTuner(ParamsTuner): - """Wrapper for optuna tuner. - - Args: - timeout: Maximum learning time. - n_trials: Maximum number of trials. - direction: Direction of optimization. - Set ``minimize`` for minimization - and ``maximize`` for maximization. - fit_on_holdout: Will be used holdout cv-iterator. - random_state: Seed for optuna sampler. - - """ - - _name: str = "OptunaTuner" - - study: optuna.study.Study = None - estimated_n_trials: int = None - mean_trial_time: Optional[int] = None - - def __init__( - # TODO: For now, metric is designed to be greater is better. Change maximize param after metric refactor if needed - self, - timeout: Optional[int] = 1000, - n_trials: Optional[int] = 100, - direction: Optional[str] = "maximize", - fit_on_holdout: bool = True, - random_state: int = 42, - ): - self.timeout = timeout - self.n_trials = n_trials - self.estimated_n_trials = n_trials - self.direction = direction - self._fit_on_holdout = fit_on_holdout - self.random_state = random_state - - def _upd_timeout(self, timeout): - self.timeout = min(self.timeout, timeout) - - def fit( - self, - ml_algo: TunableAlgo, - train_valid_iterator: Optional[TrainValidIterator] = None, - ) -> Tuple[Optional[TunableAlgo], Optional[LAMLDataset]]: - """Tune model. + def allocate_resources_for_optuna_jobs(self, overall_num_cpu: int, rows_num: int, is_nn: bool = False): + """Get the number of CPU needed per process and the number of processes. Taking into account the length of the dataset. Args: - ml_algo: Algo that is tuned. - train_valid_iterator: Classic cv-iterator. + overall_num_cpu (int): Maximum number of CPUs available. + rows_num (int): Length of the dataset. + is_nn (bool, optional): Whether the task is a neural network task. Defaults to False. Returns: - Tuple (None, None) if an optuna exception raised - or ``fit_on_holdout=True`` and ``train_valid_iterator`` is - not :class:`~lightautoml.validation.base.HoldoutIterator`. - Tuple (MlALgo, preds_ds) otherwise. - + tuple: An empirical number of CPU for a process that works better for a specific dataset length, + and the number of processes. """ - assert not ml_algo.is_fitted, "Fitted algo cannot be tuned." - self._params_scores = [] + if is_nn: + return overall_num_cpu, 1 # TODO: test optuna parallelisation for nn - # optuna.logging.set_verbosity(get_stdout_level()) - # upd timeout according to ml_algo timer - estimated_tuning_time = ml_algo.timer.estimate_tuner_time(len(train_valid_iterator)) - if estimated_tuning_time: - # TODO: Check for minimal runtime! - estimated_tuning_time = max(estimated_tuning_time, 1) - self._upd_timeout(estimated_tuning_time) - - logger.info( - f"Start hyperparameters optimization for \x1b[1m{ml_algo._name}\x1b[0m ... Time budget is {self.timeout:.2f} secs" - ) - - metric_name = train_valid_iterator.train.task.get_dataset_metric().name - ml_algo = deepcopy(ml_algo) - - flg_new_iterator = False - if self._fit_on_holdout and type(train_valid_iterator) != HoldoutIterator: - train_valid_iterator = train_valid_iterator.convert_to_holdout_iterator() - flg_new_iterator = True - - # TODO: Check if time estimation will be ok with multiprocessing - def update_trial_time(study: optuna.study.Study, trial: optuna.trial.FrozenTrial): - """Callback for number of iteration with time cut-off. + def split_cpus(n_cpu_per_job: int): + """Helper function. Args: - study: Optuna study object. - trial: Optuna trial object. - - """ - ml_algo.mean_trial_time = study.trials_dataframe()["duration"].mean().total_seconds() - self.estimated_n_trials = min(self.n_trials, self.timeout // ml_algo.mean_trial_time) - - logger.info3( - f"\x1b[1mTrial {len(study.trials)}\x1b[0m with hyperparameters {trial.params} scored {trial.value} in {trial.duration}" - ) - - try: - # Custom progress bar - def custom_progress_bar(study: optuna.study.Study, trial: optuna.trial.FrozenTrial): - best_trial = study.best_trial - progress_bar.set_postfix(best_trial=best_trial.number, best_value=best_trial.value) - progress_bar.update(1) - - # Initialize progress bar - if get_stdout_level() in [logging.INFO, logging.INFO2]: - progress_bar = tqdm(total=self.n_trials, desc="Optimization Progress") + n_cpu_per_job (int): excpected number of cpu for a job - sampler = optuna.samplers.TPESampler(seed=self.random_state) - self.study = optuna.create_study(direction=self.direction, sampler=sampler) - - self.study.optimize( - func=self._get_objective( - ml_algo=ml_algo, - estimated_n_trials=self.estimated_n_trials, - train_valid_iterator=train_valid_iterator, - ), - n_trials=self.n_trials, - timeout=self.timeout, - callbacks=( - [update_trial_time, custom_progress_bar] - if get_stdout_level() in [logging.INFO, logging.INFO2] - else [update_trial_time] - ), - ) - - # Close the progress bar if it was initialized - if get_stdout_level() in [logging.INFO, logging.INFO2]: - progress_bar.close() - - # need to update best params here - if self.direction == "maximize": - self._best_params = max(self._params_scores, key=lambda x: x[1])[0] - else: - self._best_params = min(self._params_scores, key=lambda x: x[1])[0] - - ml_algo.params = self._best_params - del self._params_scores - - logger.info(f"Hyperparameters optimization for \x1b[1m{ml_algo._name}\x1b[0m completed") - logger.info2( - f"The set of hyperparameters \x1b[1m{self._best_params}\x1b[0m\n achieve {self.study.best_value:.4f} {metric_name}" - ) - - if flg_new_iterator: - # if tuner was fitted on holdout set we dont need to save train results - return None, None - - preds_ds = ml_algo.fit_predict(train_valid_iterator) - - return ml_algo, preds_ds - except optuna.exceptions.OptunaError: - del self._params_scores - return None, None - - def _get_objective( - self, - ml_algo: TunableAlgo, - estimated_n_trials: int, - train_valid_iterator: TrainValidIterator, - ) -> Callable[[optuna.trial.Trial], Union[float, int]]: - """Get objective. - - Args: - ml_algo: Tunable algorithm. - estimated_n_trials: Maximum number of hyperparameter estimations. - train_valid_iterator: Used for getting parameters - depending on dataset. + Returns: + num_cpu_per_process (int): final number of cpu for a job + n_jobs (int): number of jobs for optuna - Returns: - Callable objective. - - """ - assert isinstance(ml_algo, MLAlgo) - - def objective(trial: optuna.trial.Trial) -> float: - _ml_algo = deepcopy(ml_algo) - - optimization_search_space = _ml_algo.optimization_search_space - if not optimization_search_space: - optimization_search_space = _ml_algo._default_sample - - if callable(optimization_search_space): - sampled_params = optimization_search_space( - trial=trial, - estimated_n_trials=estimated_n_trials, - suggested_params=_ml_algo.init_params_on_input(train_valid_iterator), - ) + """ + # if num of cpu we have is less then 2*num_cpu needed for a proces then just use one job + if overall_num_cpu <= n_cpu_per_job * 2 - 1: + num_cpu_per_process = overall_num_cpu + n_jobs = 1 else: - sampled_params = self._sample( - trial=trial, - optimization_search_space=optimization_search_space, - suggested_params=_ml_algo.init_params_on_input(train_valid_iterator), - ) - - _ml_algo.params = sampled_params - output_dataset = _ml_algo.fit_predict(train_valid_iterator=train_valid_iterator) - score = _ml_algo.score(output_dataset) - self._params_scores.append((sampled_params, score)) - return score - - return objective - - def _sample( - self, - optimization_search_space, - trial: optuna.trial.Trial, - suggested_params: dict, - ) -> dict: - # logger.info3(f'Suggested parameters: {suggested_params}') - trial_values = copy(suggested_params) - for parameter_name, search_space in optimization_search_space.items(): - not_supported = True - for key_class in OPTUNA_DISTRIBUTIONS_MAP: - if isinstance(search_space, key_class): - wrapped_search_space = OPTUNA_DISTRIBUTIONS_MAP[key_class](search_space) - trial_values[parameter_name] = wrapped_search_space( - name=parameter_name, - trial=trial, - ) - not_supported = False - if not_supported: - raise ValueError(f"Optuna does not support distribution {search_space}") + num_cpu_per_process = n_cpu_per_job + n_jobs = overall_num_cpu // num_cpu_per_process + return num_cpu_per_process, n_jobs + + if rows_num <= 50_000: + num_cpu_per_process, n_jobs = split_cpus(2) + elif rows_num <= 1_000_000: + num_cpu_per_process, n_jobs = split_cpus(4) + elif rows_num <= 5_000_000: + num_cpu_per_process, n_jobs = split_cpus(8) + else: + num_cpu_per_process, n_jobs = split_cpus(16) - def plot(self): - """Plot optimization history of all trials in a study.""" - return optuna.visualization.plot_optimization_history(self.study) + return num_cpu_per_process, n_jobs