diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py index b711388609..b548f519e0 100644 --- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py @@ -242,7 +242,6 @@ def __exit__( self._in_context = False return False # Do not suppress exceptions - @abstractmethod def start(self) -> None: """Start the scheduling loop.""" assert self.experiment is not None @@ -257,13 +256,33 @@ def start(self) -> None: if self._config_id > 0: tunables = self.load_tunable_config(self._config_id) - self.schedule_trial(tunables) + # If a config_id is provided, assume it is expected to be run immediately. + self.add_trial_to_queue(tunables, ts_start=datetime.now(UTC)) + + is_warm_up: bool = self.optimizer.supports_preload + if not is_warm_up: + _LOG.warning("Skip pending trials and warm-up: %s", self.optimizer) + + not_done: bool = True + while not_done: + _LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id) + self.run_schedule(is_warm_up) + not_done = self.add_new_optimizer_suggestions() + self.assign_trial_runners( + self.experiment.pending_trials( + datetime.now(UTC), + running=False, + trial_runner_assigned=False, + ) + ) + is_warm_up = False def teardown(self) -> None: """ Tear down the TrialRunners/Environment(s). - Call it after the completion of the `.start()` in the scheduler context. + Call it after the completion of the :py:meth:`Scheduler.start` in the + Scheduler context. """ assert self.experiment is not None if self._do_teardown: @@ -290,13 +309,20 @@ def load_tunable_config(self, config_id: int) -> TunableGroups: _LOG.debug("Config %d ::\n%s", config_id, json.dumps(tunable_values, indent=2)) return tunables.copy() - def _schedule_new_optimizer_suggestions(self) -> bool: + def add_new_optimizer_suggestions(self) -> bool: """ Optimizer part of the loop. - Load the results of the executed trials into the optimizer, suggest new - configurations, and add them to the queue. Return True if optimization is not - over, False otherwise. + Load the results of the executed trials into the + :py:class:`~.Optimizer`, suggest new configurations, and add them to the + queue. + + Returns + ------- + bool + The return value indicates whether the optimization process should + continue to get suggestions from the Optimizer or not. + See Also: :py:meth:`~.Scheduler.not_done`. """ assert self.experiment is not None (trial_ids, configs, scores, status) = self.experiment.load(self._last_trial_id) @@ -304,40 +330,67 @@ def _schedule_new_optimizer_suggestions(self) -> bool: self.optimizer.bulk_register(configs, scores, status) self._last_trial_id = max(trial_ids, default=self._last_trial_id) + # Check if the optimizer has converged or not. not_done = self.not_done() if not_done: tunables = self.optimizer.suggest() - self.schedule_trial(tunables) - + self.add_trial_to_queue(tunables) return not_done - def schedule_trial(self, tunables: TunableGroups) -> None: - """Add a configuration to the queue of trials.""" - # TODO: Alternative scheduling policies may prefer to expand repeats over - # time as well as space, or adjust the number of repeats (budget) of a given - # trial based on whether initial results are promising. + def add_trial_to_queue( + self, + tunables: TunableGroups, + ts_start: datetime | None = None, + ) -> None: + """ + Add a configuration to the queue of trials 1 or more times. + + (e.g., according to the :py:attr:`~.Scheduler.trial_config_repeat_count`) + + Parameters + ---------- + tunables : TunableGroups + The tunable configuration to add to the queue. + + ts_start : datetime.datetime | None + Optional timestamp to use to start the trial. + + Notes + ----- + Alternative scheduling policies may prefer to expand repeats over + time as well as space, or adjust the number of repeats (budget) of a given + trial based on whether initial results are promising. + """ for repeat_i in range(1, self._trial_config_repeat_count + 1): self._add_trial_to_queue( tunables, - config={ - # Add some additional metadata to track for the trial such as the - # optimizer config used. - # Note: these values are unfortunately mutable at the moment. - # Consider them as hints of what the config was the trial *started*. - # It is possible that the experiment configs were changed - # between resuming the experiment (since that is not currently - # prevented). - "optimizer": self.optimizer.name, - "repeat_i": repeat_i, - "is_defaults": tunables.is_defaults(), - **{ - f"opt_{key}_{i}": val - for (i, opt_target) in enumerate(self.optimizer.targets.items()) - for (key, val) in zip(["target", "direction"], opt_target) - }, - }, + ts_start=ts_start, + config=self._augment_trial_config_metadata(tunables, repeat_i), ) + def _augment_trial_config_metadata( + self, + tunables: TunableGroups, + repeat_i: int, + ) -> dict[str, Any]: + return { + # Add some additional metadata to track for the trial such as the + # optimizer config used. + # Note: these values are unfortunately mutable at the moment. + # Consider them as hints of what the config was the trial *started*. + # It is possible that the experiment configs were changed + # between resuming the experiment (since that is not currently + # prevented). + "optimizer": self.optimizer.name, + "repeat_i": repeat_i, + "is_defaults": tunables.is_defaults(), + **{ + f"opt_{key}_{i}": val + for (i, opt_target) in enumerate(self.optimizer.targets.items()) + for (key, val) in zip(["target", "direction"], opt_target) + }, + } + def _add_trial_to_queue( self, tunables: TunableGroups, @@ -355,10 +408,11 @@ def _add_trial_to_queue( def assign_trial_runners(self, trials: Iterable[Storage.Trial]) -> None: """ - Assigns TrialRunners to the given Trial in batch. + Assigns a :py:class:`~.TrialRunner` to each :py:class:`~.Storage.Trial` in the + batch. - The base class implements a simple round-robin scheduling algorithm for each - Trial in sequence. + The base class implements a simple round-robin scheduling algorithm for + each Trial in sequence. Subclasses can override this method to implement a more sophisticated policy. For instance:: @@ -378,6 +432,11 @@ def assign_trial_runners( trial.set_trial_runner(trial_runner) ... + Notes + ----- + Subclasses are *not* required to assign a TrialRunner to the Trial + (e.g., if the Trial should be deferred to a later time). + Parameters ---------- trials : Iterable[Storage.Trial] @@ -414,7 +473,8 @@ def assign_trial_runners( def get_trial_runner(self, trial: Storage.Trial) -> TrialRunner: """ - Gets the TrialRunner associated with the given Trial. + Gets the :py:class:`~.TrialRunner` associated with the given + :py:class:`~.Storage.Trial`. Parameters ---------- @@ -437,25 +497,45 @@ def get_trial_runner(self, trial: Storage.Trial) -> TrialRunner: assert trial_runner.trial_runner_id == trial.trial_runner_id return trial_runner - def _run_schedule(self, running: bool = False) -> None: + def run_schedule(self, running: bool = False) -> None: """ - Scheduler part of the loop. + Runs the current schedule of trials. + + Check for :py:class:`~.Storage.Trial` instances with `:py:attr:`.Status.PENDING` + and an assigned :py:attr:`~.Storage.Trial.trial_runner_id` in the queue and run + them with :py:meth:`~.Scheduler.run_trial`. + + Subclasses can override this method to implement a more sophisticated + scheduling policy. - Check for pending trials in the queue and run them. + Parameters + ---------- + running : bool + If True, run the trials that are already in a "running" state (e.g., to resume them). + If False (default), run the trials that are pending. """ assert self.experiment is not None - # Make sure that any pending trials have a TrialRunner assigned. - pending_trials = list(self.experiment.pending_trials(datetime.now(UTC), running=running)) - self.assign_trial_runners(pending_trials) + pending_trials = list( + self.experiment.pending_trials( + datetime.now(UTC), + running=running, + trial_runner_assigned=True, + ) + ) for trial in pending_trials: + assert ( + trial.trial_runner_id is not None + ), f"Trial {trial} has no TrialRunner assigned yet." self.run_trial(trial) def not_done(self) -> bool: """ Check the stopping conditions. - By default, stop when the optimizer converges or max limit of trials reached. + By default, stop when the :py:class:`.Optimizer` converges or the limit + of :py:attr:`~.Scheduler.max_trials` is reached. """ + # TODO: Add more stopping conditions: https://github.com/microsoft/MLOS/issues/427 return self.optimizer.not_converged() and ( self._trial_count < self._max_trials or self._max_trials <= 0 ) diff --git a/mlos_bench/mlos_bench/schedulers/sync_scheduler.py b/mlos_bench/mlos_bench/schedulers/sync_scheduler.py index f450b28b8f..1f3a12e828 100644 --- a/mlos_bench/mlos_bench/schedulers/sync_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/sync_scheduler.py @@ -15,24 +15,10 @@ class SyncScheduler(Scheduler): """A simple single-threaded synchronous optimization loop implementation.""" - def start(self) -> None: - """Start the optimization loop.""" - super().start() - - is_warm_up = self.optimizer.supports_preload - if not is_warm_up: - _LOG.warning("Skip pending trials and warm-up: %s", self.optimizer) - - not_done = True - while not_done: - _LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id) - self._run_schedule(is_warm_up) - not_done = self._schedule_new_optimizer_suggestions() - is_warm_up = False - def run_trial(self, trial: Storage.Trial) -> None: """ - Set up and run a single trial. + Set up and run a single :py:class:`~.Storage.Trial` on its + :py:class:`~.TrialRunner`. Save the results in the storage. """ diff --git a/mlos_bench/mlos_bench/schedulers/trial_runner.py b/mlos_bench/mlos_bench/schedulers/trial_runner.py index 80eb696bc6..63c15a0e1f 100644 --- a/mlos_bench/mlos_bench/schedulers/trial_runner.py +++ b/mlos_bench/mlos_bench/schedulers/trial_runner.py @@ -20,6 +20,7 @@ from mlos_bench.services.types import SupportsConfigLoading from mlos_bench.storage.base_storage import Storage from mlos_bench.tunables.tunable_groups import TunableGroups +from mlos_bench.tunables.tunable_types import TunableValue _LOG = logging.getLogger(__name__) @@ -168,7 +169,7 @@ def run_trial( self, trial: Storage.Trial, global_config: dict[str, Any] | None = None, - ) -> None: + ) -> tuple[Status, datetime, dict[str, TunableValue] | None]: """ Run a single trial on this TrialRunner's Environment and stores the results in the backend Trial Storage. @@ -198,9 +199,10 @@ def run_trial( if not self.environment.setup(trial.tunables, trial.config(global_config)): _LOG.warning("Setup failed: %s :: %s", self.environment, trial.tunables) # FIXME: Use the actual timestamp from the environment. - _LOG.info("TrialRunner: Update trial results: %s :: %s", trial, Status.FAILED) - trial.update(Status.FAILED, datetime.now(UTC)) - return + (status, timestamp, results) = (Status.FAILED, datetime.now(UTC), None) + _LOG.info("TrialRunner: Update trial results: %s :: %s", trial, status) + trial.update(status, timestamp) + return (status, timestamp, results) # TODO: start background status polling of the environments in the event loop. @@ -221,6 +223,8 @@ def run_trial( self._is_running = False + return (status, timestamp, results) + def teardown(self) -> None: """ Tear down the Environment. diff --git a/mlos_bench/mlos_bench/storage/base_storage.py b/mlos_bench/mlos_bench/storage/base_storage.py index f2d393994f..52ecf93b7d 100644 --- a/mlos_bench/mlos_bench/storage/base_storage.py +++ b/mlos_bench/mlos_bench/storage/base_storage.py @@ -313,18 +313,26 @@ def pending_trials( timestamp: datetime, *, running: bool, + trial_runner_assigned: bool | None = None, ) -> Iterator["Storage.Trial"]: """ - Return an iterator over the pending trials that are scheduled to run on or - before the specified timestamp. + Return an iterator over :py:attr:`~.Status.PENDING` + :py:class:`~.Storage.Trial` instances that have a scheduled start time to + run on or before the specified timestamp. Parameters ---------- timestamp : datetime.datetime - The time in UTC to check for scheduled trials. + The time in UTC to check for scheduled Trials. running : bool - If True, include the trials that are already running. + If True, include the Trials that are also + :py:attr:`~.Status.RUNNING` or :py:attr:`~.Status.READY`. Otherwise, return only the scheduled trials. + trial_runner_assigned : bool | None + If True, include the Trials that are assigned to a + :py:class:`~.TrialRunner`. If False, return only the trials + that are not assigned to any :py:class:`~.TrialRunner`. + If None, return all trials regardless of their assignment. Returns ------- diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index eb47de7d71..2d7db8e34e 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -235,25 +235,35 @@ def _get_key_val(conn: Connection, table: Table, field: str, **kwargs: Any) -> d row._tuple() for row in cur_result.fetchall() # pylint: disable=protected-access ) - def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]: + def pending_trials( + self, + timestamp: datetime, + *, + running: bool = False, + trial_runner_assigned: bool | None = None, + ) -> Iterator[Storage.Trial]: timestamp = utcify_timestamp(timestamp, origin="local") _LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp) if running: - pending_status = [Status.PENDING.name, Status.READY.name, Status.RUNNING.name] + statuses = [Status.PENDING, Status.READY, Status.RUNNING] else: - pending_status = [Status.PENDING.name] + statuses = [Status.PENDING] with self._engine.connect() as conn: - cur_trials = conn.execute( - self._schema.trial.select().where( - self._schema.trial.c.exp_id == self._experiment_id, - ( - self._schema.trial.c.ts_start.is_(None) - | (self._schema.trial.c.ts_start <= timestamp) - ), - self._schema.trial.c.ts_end.is_(None), - self._schema.trial.c.status.in_(pending_status), - ) + stmt = self._schema.trial.select().where( + self._schema.trial.c.exp_id == self._experiment_id, + ( + self._schema.trial.c.ts_start.is_(None) + | (self._schema.trial.c.ts_start <= timestamp) + ), + self._schema.trial.c.ts_end.is_(None), + self._schema.trial.c.status.in_([s.name for s in statuses]), ) + if trial_runner_assigned: + stmt = stmt.where(self._schema.trial.c.trial_runner_id.isnot(None)) + elif trial_runner_assigned is False: + stmt = stmt.where(self._schema.trial.c.trial_runner_id.is_(None)) + # else: # No filtering by trial_runner_id + cur_trials = conn.execute(stmt) for trial in cur_trials.fetchall(): tunables = self._get_key_val( conn, diff --git a/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py b/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py index aaf545c787..faefe4998b 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py @@ -61,6 +61,53 @@ def test_schedule_trial( trial_now2_data = exp_data.trials[trial_now2.trial_id] assert trial_now2_data.trial_runner_id == trial_now2.trial_runner_id + # --- Test the trial_runner_assigned parameter --- + # At this point: + # - trial_now1: no trial_runner assigned + # - trial_now2: trial_runner assigned + # - trial_1h, trial_2h: no trial_runner assigned + + # All pending trials (should include all 4) + all_pending = _trial_ids( + exp_storage.pending_trials( + timestamp + timedelta_1hr * 3, + running=False, + trial_runner_assigned=None, + ) + ) + assert all_pending == { + trial_now1.trial_id, + trial_now2.trial_id, + trial_1h.trial_id, + trial_2h.trial_id, + }, f"Expected all pending trials, got {all_pending}" + + # Only those with a trial_runner assigned + assigned_pending = _trial_ids( + exp_storage.pending_trials( + timestamp + timedelta_1hr * 3, + running=False, + trial_runner_assigned=True, + ) + ) + assert assigned_pending == { + trial_now2.trial_id + }, f"Expected only trials with a runner assigned, got {assigned_pending}" + + # Only those without a trial_runner assigned + unassigned_pending = _trial_ids( + exp_storage.pending_trials( + timestamp + timedelta_1hr * 3, + running=False, + trial_runner_assigned=False, + ) + ) + assert unassigned_pending == { + trial_now1.trial_id, + trial_1h.trial_id, + trial_2h.trial_id, + }, f"Expected only trials without a runner assigned, got {unassigned_pending}" + # Scheduler side: get trials ready to run at certain timestamps: # Pretend 1 minute has passed, get trials scheduled to run: