Skip to content
164 changes: 122 additions & 42 deletions mlos_bench/mlos_bench/schedulers/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ def __exit__(
self._in_context = False
return False # Do not suppress exceptions

@abstractmethod
def start(self) -> None:
"""Start the scheduling loop."""
assert self.experiment is not None
Expand All @@ -257,13 +256,33 @@ def start(self) -> None:

if self._config_id > 0:
tunables = self.load_tunable_config(self._config_id)
self.schedule_trial(tunables)
# If a config_id is provided, assume it is expected to be run immediately.
self.add_trial_to_queue(tunables, ts_start=datetime.now(UTC))

is_warm_up: bool = self.optimizer.supports_preload
if not is_warm_up:
_LOG.warning("Skip pending trials and warm-up: %s", self.optimizer)

not_done: bool = True
while not_done:
_LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id)
self.run_schedule(is_warm_up)
not_done = self.add_new_optimizer_suggestions()
self.assign_trial_runners(
self.experiment.pending_trials(
datetime.now(UTC),
running=False,
trial_runner_assigned=False,
)
)
is_warm_up = False

def teardown(self) -> None:
"""
Tear down the TrialRunners/Environment(s).

Call it after the completion of the `.start()` in the scheduler context.
Call it after the completion of the :py:meth:`Scheduler.start` in the
Scheduler context.
"""
assert self.experiment is not None
if self._do_teardown:
Expand All @@ -290,54 +309,88 @@ def load_tunable_config(self, config_id: int) -> TunableGroups:
_LOG.debug("Config %d ::\n%s", config_id, json.dumps(tunable_values, indent=2))
return tunables.copy()

def _schedule_new_optimizer_suggestions(self) -> bool:
def add_new_optimizer_suggestions(self) -> bool:
"""
Optimizer part of the loop.

Load the results of the executed trials into the optimizer, suggest new
configurations, and add them to the queue. Return True if optimization is not
over, False otherwise.
Load the results of the executed trials into the
:py:class:`~.Optimizer`, suggest new configurations, and add them to the
queue.

Returns
-------
bool
The return value indicates whether the optimization process should
continue to get suggestions from the Optimizer or not.
See Also: :py:meth:`~.Scheduler.not_done`.
"""
assert self.experiment is not None
(trial_ids, configs, scores, status) = self.experiment.load(self._last_trial_id)
_LOG.info("QUEUE: Update the optimizer with trial results: %s", trial_ids)
self.optimizer.bulk_register(configs, scores, status)
self._last_trial_id = max(trial_ids, default=self._last_trial_id)

# Check if the optimizer has converged or not.
not_done = self.not_done()
if not_done:
tunables = self.optimizer.suggest()
self.schedule_trial(tunables)

self.add_trial_to_queue(tunables)
return not_done

def schedule_trial(self, tunables: TunableGroups) -> None:
"""Add a configuration to the queue of trials."""
# TODO: Alternative scheduling policies may prefer to expand repeats over
# time as well as space, or adjust the number of repeats (budget) of a given
# trial based on whether initial results are promising.
def add_trial_to_queue(
self,
tunables: TunableGroups,
ts_start: datetime | None = None,
) -> None:
"""
Add a configuration to the queue of trials 1 or more times.

(e.g., according to the :py:attr:`~.Scheduler.trial_config_repeat_count`)

Parameters
----------
tunables : TunableGroups
The tunable configuration to add to the queue.

ts_start : datetime.datetime | None
Optional timestamp to use to start the trial.

Notes
-----
Alternative scheduling policies may prefer to expand repeats over
time as well as space, or adjust the number of repeats (budget) of a given
trial based on whether initial results are promising.
"""
for repeat_i in range(1, self._trial_config_repeat_count + 1):
self._add_trial_to_queue(
tunables,
config={
# Add some additional metadata to track for the trial such as the
# optimizer config used.
# Note: these values are unfortunately mutable at the moment.
# Consider them as hints of what the config was the trial *started*.
# It is possible that the experiment configs were changed
# between resuming the experiment (since that is not currently
# prevented).
"optimizer": self.optimizer.name,
"repeat_i": repeat_i,
"is_defaults": tunables.is_defaults(),
**{
f"opt_{key}_{i}": val
for (i, opt_target) in enumerate(self.optimizer.targets.items())
for (key, val) in zip(["target", "direction"], opt_target)
},
},
ts_start=ts_start,
config=self._augment_trial_config_metadata(tunables, repeat_i),
)

def _augment_trial_config_metadata(
self,
tunables: TunableGroups,
repeat_i: int,
) -> dict[str, Any]:
return {
# Add some additional metadata to track for the trial such as the
# optimizer config used.
# Note: these values are unfortunately mutable at the moment.
# Consider them as hints of what the config was the trial *started*.
# It is possible that the experiment configs were changed
# between resuming the experiment (since that is not currently
# prevented).
"optimizer": self.optimizer.name,
"repeat_i": repeat_i,
"is_defaults": tunables.is_defaults(),
**{
f"opt_{key}_{i}": val
for (i, opt_target) in enumerate(self.optimizer.targets.items())
for (key, val) in zip(["target", "direction"], opt_target)
},
}

def _add_trial_to_queue(
self,
tunables: TunableGroups,
Expand All @@ -355,10 +408,11 @@ def _add_trial_to_queue(

def assign_trial_runners(self, trials: Iterable[Storage.Trial]) -> None:
"""
Assigns TrialRunners to the given Trial in batch.
Assigns a :py:class:`~.TrialRunner` to each :py:class:`~.Storage.Trial` in the
batch.

The base class implements a simple round-robin scheduling algorithm for each
Trial in sequence.
The base class implements a simple round-robin scheduling algorithm for
each Trial in sequence.

Subclasses can override this method to implement a more sophisticated policy.
For instance::
Expand All @@ -378,6 +432,11 @@ def assign_trial_runners(
trial.set_trial_runner(trial_runner)
...

Notes
-----
Subclasses are *not* required to assign a TrialRunner to the Trial
(e.g., if the Trial should be deferred to a later time).

Parameters
----------
trials : Iterable[Storage.Trial]
Expand Down Expand Up @@ -414,7 +473,8 @@ def assign_trial_runners(

def get_trial_runner(self, trial: Storage.Trial) -> TrialRunner:
"""
Gets the TrialRunner associated with the given Trial.
Gets the :py:class:`~.TrialRunner` associated with the given
:py:class:`~.Storage.Trial`.

Parameters
----------
Expand All @@ -437,25 +497,45 @@ def get_trial_runner(self, trial: Storage.Trial) -> TrialRunner:
assert trial_runner.trial_runner_id == trial.trial_runner_id
return trial_runner

def _run_schedule(self, running: bool = False) -> None:
def run_schedule(self, running: bool = False) -> None:
"""
Scheduler part of the loop.
Runs the current schedule of trials.

Check for :py:class:`~.Storage.Trial` instances with `:py:attr:`.Status.PENDING`
and an assigned :py:attr:`~.Storage.Trial.trial_runner_id` in the queue and run
them with :py:meth:`~.Scheduler.run_trial`.

Subclasses can override this method to implement a more sophisticated
scheduling policy.

Check for pending trials in the queue and run them.
Parameters
----------
running : bool
If True, run the trials that are already in a "running" state (e.g., to resume them).
If False (default), run the trials that are pending.
"""
assert self.experiment is not None
# Make sure that any pending trials have a TrialRunner assigned.
pending_trials = list(self.experiment.pending_trials(datetime.now(UTC), running=running))
self.assign_trial_runners(pending_trials)
pending_trials = list(
self.experiment.pending_trials(
datetime.now(UTC),
running=running,
trial_runner_assigned=True,
)
)
for trial in pending_trials:
assert (
trial.trial_runner_id is not None
), f"Trial {trial} has no TrialRunner assigned yet."
self.run_trial(trial)

def not_done(self) -> bool:
"""
Check the stopping conditions.

By default, stop when the optimizer converges or max limit of trials reached.
By default, stop when the :py:class:`.Optimizer` converges or the limit
of :py:attr:`~.Scheduler.max_trials` is reached.
"""
# TODO: Add more stopping conditions: https://github.com/microsoft/MLOS/issues/427
return self.optimizer.not_converged() and (
self._trial_count < self._max_trials or self._max_trials <= 0
)
Expand Down
18 changes: 2 additions & 16 deletions mlos_bench/mlos_bench/schedulers/sync_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,10 @@
class SyncScheduler(Scheduler):
"""A simple single-threaded synchronous optimization loop implementation."""

def start(self) -> None:
"""Start the optimization loop."""
super().start()

is_warm_up = self.optimizer.supports_preload
if not is_warm_up:
_LOG.warning("Skip pending trials and warm-up: %s", self.optimizer)

not_done = True
while not_done:
_LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id)
self._run_schedule(is_warm_up)
not_done = self._schedule_new_optimizer_suggestions()
is_warm_up = False

def run_trial(self, trial: Storage.Trial) -> None:
"""
Set up and run a single trial.
Set up and run a single :py:class:`~.Storage.Trial` on its
:py:class:`~.TrialRunner`.

Save the results in the storage.
"""
Expand Down
12 changes: 8 additions & 4 deletions mlos_bench/mlos_bench/schedulers/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from mlos_bench.services.types import SupportsConfigLoading
from mlos_bench.storage.base_storage import Storage
from mlos_bench.tunables.tunable_groups import TunableGroups
from mlos_bench.tunables.tunable_types import TunableValue

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -168,7 +169,7 @@ def run_trial(
self,
trial: Storage.Trial,
global_config: dict[str, Any] | None = None,
) -> None:
) -> tuple[Status, datetime, dict[str, TunableValue] | None]:
"""
Run a single trial on this TrialRunner's Environment and stores the results in
the backend Trial Storage.
Expand Down Expand Up @@ -198,9 +199,10 @@ def run_trial(
if not self.environment.setup(trial.tunables, trial.config(global_config)):
_LOG.warning("Setup failed: %s :: %s", self.environment, trial.tunables)
# FIXME: Use the actual timestamp from the environment.
_LOG.info("TrialRunner: Update trial results: %s :: %s", trial, Status.FAILED)
trial.update(Status.FAILED, datetime.now(UTC))
return
(status, timestamp, results) = (Status.FAILED, datetime.now(UTC), None)
_LOG.info("TrialRunner: Update trial results: %s :: %s", trial, status)
trial.update(status, timestamp)
return (status, timestamp, results)

# TODO: start background status polling of the environments in the event loop.

Expand All @@ -221,6 +223,8 @@ def run_trial(

self._is_running = False

return (status, timestamp, results)

def teardown(self) -> None:
"""
Tear down the Environment.
Expand Down
16 changes: 12 additions & 4 deletions mlos_bench/mlos_bench/storage/base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,18 +313,26 @@ def pending_trials(
timestamp: datetime,
*,
running: bool,
trial_runner_assigned: bool | None = None,
) -> Iterator["Storage.Trial"]:
"""
Return an iterator over the pending trials that are scheduled to run on or
before the specified timestamp.
Return an iterator over :py:attr:`~.Status.PENDING`
:py:class:`~.Storage.Trial` instances that have a scheduled start time to
run on or before the specified timestamp.

Parameters
----------
timestamp : datetime.datetime
The time in UTC to check for scheduled trials.
The time in UTC to check for scheduled Trials.
running : bool
If True, include the trials that are already running.
If True, include the Trials that are also
:py:attr:`~.Status.RUNNING` or :py:attr:`~.Status.READY`.
Otherwise, return only the scheduled trials.
trial_runner_assigned : bool | None
If True, include the Trials that are assigned to a
:py:class:`~.TrialRunner`. If False, return only the trials
that are not assigned to any :py:class:`~.TrialRunner`.
If None, return all trials regardless of their assignment.

Returns
-------
Expand Down
36 changes: 23 additions & 13 deletions mlos_bench/mlos_bench/storage/sql/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,25 +235,35 @@ def _get_key_val(conn: Connection, table: Table, field: str, **kwargs: Any) -> d
row._tuple() for row in cur_result.fetchall() # pylint: disable=protected-access
)

def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]:
def pending_trials(
self,
timestamp: datetime,
*,
running: bool = False,
trial_runner_assigned: bool | None = None,
) -> Iterator[Storage.Trial]:
timestamp = utcify_timestamp(timestamp, origin="local")
_LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp)
if running:
pending_status = [Status.PENDING.name, Status.READY.name, Status.RUNNING.name]
statuses = [Status.PENDING, Status.READY, Status.RUNNING]
else:
pending_status = [Status.PENDING.name]
statuses = [Status.PENDING]
with self._engine.connect() as conn:
cur_trials = conn.execute(
self._schema.trial.select().where(
self._schema.trial.c.exp_id == self._experiment_id,
(
self._schema.trial.c.ts_start.is_(None)
| (self._schema.trial.c.ts_start <= timestamp)
),
self._schema.trial.c.ts_end.is_(None),
self._schema.trial.c.status.in_(pending_status),
)
stmt = self._schema.trial.select().where(
self._schema.trial.c.exp_id == self._experiment_id,
(
self._schema.trial.c.ts_start.is_(None)
| (self._schema.trial.c.ts_start <= timestamp)
),
self._schema.trial.c.ts_end.is_(None),
self._schema.trial.c.status.in_([s.name for s in statuses]),
)
if trial_runner_assigned:
stmt = stmt.where(self._schema.trial.c.trial_runner_id.isnot(None))
elif trial_runner_assigned is False:
stmt = stmt.where(self._schema.trial.c.trial_runner_id.is_(None))
# else: # No filtering by trial_runner_id
cur_trials = conn.execute(stmt)
for trial in cur_trials.fetchall():
tunables = self._get_key_val(
conn,
Expand Down
Loading
Loading