diff --git a/.github/instructions/README.md b/.github/instructions/README.md new file mode 100644 index 00000000000..9675885d684 --- /dev/null +++ b/.github/instructions/README.md @@ -0,0 +1,20 @@ +# Custom Copilot Instructions + +This directory contains custom instructions for the Copilot AI assistant. +The instructions are designed to guide the AI in providing responses that align with specific project needs and preferences. + +## Organization + +- Language specific instructions go in their own file, with the cross task instructions in the root of this directory. + + (e.g., [python.instructions.md](python.instructions.md), [bash.instructions.md](bash.instructions.md), [markdown.instructions.md](markdown.instructions.md), [json.instructions.md](json.instructions.md)). + +- Instructions relevant to all languages go in the `default.instructions.md` file. + + (e.g., [default.instructions.md](default.instructions.md)). + +## See Also + +- +- [.vscode/settings.json](../.vscode/settings.json) + - Configures which of these instructions are used for each file type. diff --git a/.github/instructions/bash.instructions.md b/.github/instructions/bash.instructions.md new file mode 100644 index 00000000000..02b863f55bf --- /dev/null +++ b/.github/instructions/bash.instructions.md @@ -0,0 +1,33 @@ +--- +applyTo: '**/*.sh' +--- + +# Bash shell scripting language instructions + +- Include instructions from [default.instructions.md](default.instructions.md) for all languages. + +- Scripts schould use `set -e` or `set -o errexit` to exit on error. + +- Scripts should use use `set -u` or `set -o nounset` to exit on unset variables. + +- Scripts should use `set -o pipefail` to exit on errors in pipelines. + +- Commands should be checked for non-zero exit codes and either handled or reported. + +- Scripts should use portable syntax for MacOS vs. Linux + +- Scripts should validate input. + +- Scripts should include usage instructions. + +- Scripts should be executable (e.g., `chmod +x`). + +- Scripts should include a shebang line (e.g., `#!/usr/bin/env bash`). + +- Scripts should be well commented. + +- Scripts should include documentation updates if needed. + +- Scripts should be well formatted. + +- `if` `then` statements should be on the same line. diff --git a/.github/instructions/default.instructions.md b/.github/instructions/default.instructions.md new file mode 100644 index 00000000000..573ef7e2708 --- /dev/null +++ b/.github/instructions/default.instructions.md @@ -0,0 +1,16 @@ +--- +applyTo: '**' +--- + +# Default language review selection instructions + +- PRs should be small and focused on a single change or feature. +- PRs should have a clear description of what the change is and why it is needed. +- PRs should have tests that cover the changes made. +- PRs should try not to include any commented-out code. +- PRs should not include any unnecessary files or changes to files that are not related to the change being made. +- PRs should not include any changes to the README or other documentation unless it is directly related to the change being made. +- PRs should be well commented. +- PRs should include documentation updates if needed. +- PRs should try to avoid unnecessary formatting changes or else keep them to their own PR that is just for formatting. +- PRs that change the CI pipeline or pre-commit hooks should generally be kept to their own PR. diff --git a/.github/instructions/json.instructions.md b/.github/instructions/json.instructions.md new file mode 100644 index 00000000000..49ed88ab035 --- /dev/null +++ b/.github/instructions/json.instructions.md @@ -0,0 +1,15 @@ +--- +applyTo: '**/*.json,**/*.jsonc,**.json5' +--- + +# JSON language instructions + +- Include instructions from [default.instructions.md](default.instructions.md) for all languages. + +- Files with a `.json` extension that are ARM templates or JSON schema files should be well formatted and valid JSON, without any comments, trailing commas, etc. + +- Files with a `.json` extension that are VSCode settings files (e.g., inside the [.vscode](../../../.vscode)) or [.devcontainer](../../../.devcontainer) directories) should be well formatted and valid JSON, but may contain comments, trailing commas, etc. + +- Files with a `.jsonc` or `.json5` extension should be well formatted and valid JSON5 or JSONC or JSON, and can include comments, trailing commas, etc. + +- If a file is an `mlos_bench` config, it should have a `.mlos.jsonc` of `.mlos.json` or `.mlos.json5` extension, and should generally match the schemas defined in the [mlos_bench/configs/schemas/](../../../mlos_bench/mlos_bench/config/schemas/) directory (e.g., [mlos-bench-config-schema.json](../../../mlos_bench/mlos_bench/config/schemas/mlos-bench-config-schema.json)), unless it is a test config under the [tests/configs/schemas](../../../mlos_bench/mlos_bench/tests/configs/schemas/) directory. diff --git a/.github/instructions/markdown.instructions.md b/.github/instructions/markdown.instructions.md new file mode 100644 index 00000000000..1b2a450a68f --- /dev/null +++ b/.github/instructions/markdown.instructions.md @@ -0,0 +1,9 @@ +--- +applyTo: '**/*.md' +--- + +# Markdown language instructions + +- Include instructions from [default.instructions.md](default.instructions.md) for all languages. +- Documentation should include relative links to other documentation files whenever possible. +- Markdown files should be well formatted and valid Markdown conforming to markdownlint rules. diff --git a/.github/instructions/python.instructions.md b/.github/instructions/python.instructions.md new file mode 100644 index 00000000000..f81a76df843 --- /dev/null +++ b/.github/instructions/python.instructions.md @@ -0,0 +1,85 @@ +--- +applyTo: '**/*.py' +--- + +# Python language file instructions + +- Include instructions from [default.instructions.md](default.instructions.md) for all languages. + +- All functions, methods, classes, and attributes should have docstrings. + +- Docstrings should include Sphinx style crossref directives for functions, methods, classes, attributes, and data whenever possible using `:py:class:` or `:py:func:` or `:py:meth:` or `:py:attr:` or `:py:data` syntax, respectively, + + See Also + +- Docstrings for modules should include a summary of the module's purpose and any important details about its usage. + + - Module docstrings should also include an executable example of how to use the module, including any important functions or classes or configuration options (especially those derived from a JSON config file) like any of those in `mlos_bench.environments`, `mlos_bench.services`, `mlos_bench.schedulers`, `mlos_bench.optimizers`, and `mlos_bench.storage`. + + For instance: + + ```python + ''' + This is an example module docstring for the mlos_bench.environments.my_special_env module. + + It should include some descriptive text about the module and its purpose. + + Example + ------- + It also includes some executable code examples. + + >>> import json5 as json + >>> # Load a JSON config string for a MySpecialEnvironment instance. + >>> json_string = """ + ... { + ... "class": "mlos_bench.environments.my_special_env.MySpecialEnvironment", + ... "name": "MySpecialEnvironment", + ... "config": { + ... "param1": 42, + ... "param2": "foo", + ... }, + ... } + ... """ + >>> config = json.loads(json_string) + + >>> from mlos_bench.environments.my_special_env import MySpecialEnvironment + >>> my_env = MySpecialEnvironment(config=config) + >>> print(my_env) + MySpecialEnvironment(param1=42, param2='foo') + ''' + ``` + + - Docstrings for classes can refer to their module docstring with `:py:mod:` cross-references for usage examples to allow easier browser navigation of generated documentation. + + For instance: + + ```python + class MySpecialEnv: + """ + This is class docstring for MySpecialEnv. + + It should include some descriptive text about the class and its purpose. + + Example + ------- + Refer to to :py:mod:`mlos_bench.environments.my_special_env` for usage examples. + """ + ``` + +- If not all arguments to a function or method fit on the same line, then they should each be on their own line. + + Adding a trailing comma to the last argument is optional, but recommended for consistency whenever a single line is insufficient. + +- Code should be formatting using `black`. + +- Code should be type checked using `mypy`. + + - All function and method parameters should be type annotated. + +- Code should be linted using `pylint`. + +- Tests should be included for all new code and should be run using `pytest`. + +- Tests should be organized roughly the same way as the code they are testing (e.g., `tests/some/test_module.py` for `some/module.py`). + +- Test fixtures that setup the resources for the tests (e.g., Environments, Services, Storage, Optimizer, Scheduler, etc.) should be included in a `conftest.py` file in the same directory as the tests or else a `fixtures.py` file in the same directory as the tests. diff --git a/.github/instructions/rst.instructions.md b/.github/instructions/rst.instructions.md new file mode 100644 index 00000000000..1096e8b7a28 --- /dev/null +++ b/.github/instructions/rst.instructions.md @@ -0,0 +1,11 @@ +--- +applyTo: '**/*.rst' +--- + +# RST markup language instructions + +- Include instructions from [default.instructions.md](default.instructions.md) for all languages. + +- Documentation should include sphinx crossref directives for functions, methods, and classes whenever possible. + + See Also: diff --git a/.github/prompts/README.md b/.github/prompts/README.md new file mode 100644 index 00000000000..05d11399989 --- /dev/null +++ b/.github/prompts/README.md @@ -0,0 +1,29 @@ +# Custom `.prompt.md` files for Github Copilot + +This directory contains custom `.prompt.md` files for Github Copilot. + +These files are used to customize the behavior of Github Copilot when generating code. + +The can be invoked with the `/custom-prompt-file-name-prefix` command in the Copilot Chat view (generally when in Agent mode). + +For instance: + +```txt +/add-sphinx-crossref-to-docstrings +``` + +will invoke the [`add-sphinx-crossref-to-docstrings.prompt.md`](./add-sphinx-crossref-to-docstrings.prompt.md) file. + +Some prompts take additional arguments to help Copilot understand the context of the code being generated or other action to take. + +## Types of Custom Prompts + +There are two types of custom prompts: + +1. Those for MLOS developers (e.g. `add-sphinx-crossref-to-docstrings.prompt.md`). +1. Those for MLOS users (e.g., `generate-mlos-configuration-file.prompt.md`). + +## See Also + +- +- [TODO.md](./TODO.md) diff --git a/.github/prompts/TODO.md b/.github/prompts/TODO.md new file mode 100644 index 00000000000..4deda2addac --- /dev/null +++ b/.github/prompts/TODO.md @@ -0,0 +1,19 @@ +# Custom Prompt TODOs + +- MLOS + + - [ ] Create experiment configs + - [ ] Create (or reuse) environments configs + - [ ] Use `include` directives with `CompositeEnv` to help structure and nest things. + - [ ] Create (or reuse) services configs + - [ ] Create (or reuse) ARM templates + - [ ] Create (or reuse) storage configs + - [ ] Create (or reuse) scheduler configs + +- mlos-autotuning-template + + - [ ] Create new X config + - [ ] `include` config to do Y + - Add `globals` variables to configure X + - Create (or reuse) ARM template for X + - Make sure scripting commands are idempotent diff --git a/.github/prompts/add-config-examples-to-module-docstring.prompt.md b/.github/prompts/add-config-examples-to-module-docstring.prompt.md new file mode 100644 index 00000000000..e9007eff197 --- /dev/null +++ b/.github/prompts/add-config-examples-to-module-docstring.prompt.md @@ -0,0 +1,54 @@ +# Custom Prompt: Add config examples to module docstrings + +Let's add config examples to module docstrings in this mlos_bench module. + +- Docstrings for modules should include a summary of the module's purpose and any important details about its usage. + - Module docstrings should also include an executable example of how to use the module, including any important functions or classes or configuration options (especially those derived from a JSON config file) like any of those in `mlos_bench.environments`, `mlos_bench.services`, `mlos_bench.schedulers`, `mlos_bench.optimizers`, and `mlos_bench.storage`. + + For instance: + + ```python + ''' + This is an example module docstring for the mlos_bench.environments.my_special_env module. + + It should include some descriptive text about the module and its purpose. + + Example + ------- + It also includes some executable code examples. + + >>> import json5 as json + >>> # Load a JSON config string for a MySpecialEnvironment instance. + >>> json_string = """ + ... { + ... "class": "mlos_bench.environments.my_special_env.MySpecialEnvironment", + ... "name": "MySpecialEnvironment", + ... "config": { + ... "param1": 42, + ... "param2": "foo", + ... }, + ... } + ... """ + >>> config = json.loads(json_string) + + >>> from mlos_bench.environments.my_special_env import MySpecialEnvironment + >>> my_env = MySpecialEnvironment(config=config) + >>> print(my_env) + MySpecialEnvironment(param1=42, param2='foo') + ''' + ``` + + - Configuration options for these modules should be derived from a JSON, included as a string in the module docstring so users reading the documentation can easily copy/paste, but generally they are loaded from a separate `.mlos.jsonc` config file. + + - The JSON config string should be formatted using `json5` to allow for comments and trailing commas. + + - The JSON config options should conform to the relevant JSON schema for the module, usually defined in the [mlos_bench/configs/schemas](../../mlos_bench/mlos_bench/config/schemas/) directory. + For instance: + + - For an `mlos_bench.environments` module, the JSON config options should conform to the [mlos_bench/configs/schemas/environments](../../mlos_bench/mlos_bench/config/schemas/environments/environment-schema.json) schema file. + - For an `mlos_bench.services` module, the JSON config options should conform to the [mlos_bench/configs/schemas/services](../../mlos_bench/mlos_bench/config/schemas/services/service-schema.json) schema file. + - For an `mlos_bench.schedulers` module, the JSON config options should conform to the [mlos_bench/configs/schemas/schedulers](../../mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json) schema file. + - For an `mlos_bench.storage` module, the JSON config options should conform to the [mlos_bench/configs/schemas/storage](../../mlos_bench/mlos_bench/config/schemas/storage/storage-schema.json) schema file. + - For an `mlos_bench.optimizers` module, the JSON config options should conform to the [mlos_bench/configs/schemas/optimizers](../../mlos_bench/mlos_bench/config/schemas/optimizers/optimizer-schema.json) schema file. + + - The other options that the config can take can often also be found in the parsing of the `config` argument in the `__init__` method body of the class, but they should be included in the module docstring as well. diff --git a/.github/prompts/add-sphinx-crossref-to-docstrings.prompt.md b/.github/prompts/add-sphinx-crossref-to-docstrings.prompt.md new file mode 100644 index 00000000000..a6e7203e5ac --- /dev/null +++ b/.github/prompts/add-sphinx-crossref-to-docstrings.prompt.md @@ -0,0 +1,52 @@ +# Custom Prompt: Add Sphinx Crossref Links to Python Docstrings + +Add Sphinx cross-references to python docstrings referencing classes or functions or methods or attributes or data in this file using `:py:class:` or `:py:func:` or `:py:meth:` or `:py:attr:` or `:py:data` syntax, respectively. +\- See Also + +We don't need to do this for the parameter types listed in the Parameters or Returns sections of the docstring though. + +For example: + +```python +def example_function(param1: MyClass, param2: MyOtherClass) -> SomeOtherType: + """ + Example function working on an instance of MyClass and MyOtherClass. + + Parameters + ---------- + param1 : MyClass + An instance of MyClass. + param2 : MyOtherClass + An instance of MyOtherClass. + + Returns + ------- + SomeOtherType + An instance of SomeOtherType. + """ + pass +``` + +should be changed to: + +```python +def example_function(param1: MyClass, param2: MyOtherClass) -> SomeOtherType: + """ + Example function working on an instance of :py:class:`MyClass` and :py:class:`MyOtherClass`. + + Uses the :py:meth:`MyClass.method_name` method and the :py:attr:`MyOtherClass.attribute_name` attribute. + + Parameters + ---------- + param1 : MyClass + An instance of :py:class:`MyClass`. + param2 : MyOtherClass + An instance of :py:class:`MyOtherClass`. + + Returns + ------- + SomeOtherType + An instance of :py:class:`SomeOtherType`. + """ + pass +``` diff --git a/.github/prompts/split-large-change-branch-to-separate-change-branches.prompt.md b/.github/prompts/split-large-change-branch-to-separate-change-branches.prompt.md new file mode 100644 index 00000000000..60d6adeaa28 --- /dev/null +++ b/.github/prompts/split-large-change-branch-to-separate-change-branches.prompt.md @@ -0,0 +1,8 @@ +# Custom Prompt: Split A Large Change Branch into Separate Change Branches + +The branch I have has too many changes and I want to start splitting them out to separate PRs. Can you help me do the following: + +1. Run `git diff main > /tmp/git-diff-main.patch` to get the differences between the `main` branch and the current branch and then use `/tmp/git-diff-main.patch` to summarize the changes branch into distinct groups. You can use the description from the Github PR associated with this branch to see what else was done and needs to be split out. +1. Propose a sequence of changes to stage each of those sets of changes as different self contained and testable PRs. +1. Help me create those PRs by applying that group's changes to new and different branches. We don't need to retain the history of this branch, just the changes. For instance, we could use `git checkout -b new-branch && git diff -- some/list of/files | git apply -` to make a new branch with some selected changes applied and the use stage only the relevant changes to each file for that particular PR. + Show me the plan and pause to check in with me between each step. diff --git a/.vscode/settings.json b/.vscode/settings.json index 23e43fed683..f791711652c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,4 +1,4 @@ -// vim: set ft=jsonc: +// vim: set ft=json5: { "makefile.extensionOutputFolder": "./.vscode", "files.exclude": { @@ -147,7 +147,7 @@ "[python]": { "editor.codeActionsOnSave": { "source.organizeImports": "explicit", - "source.unusedImports": "explicit" + //"source.unusedImports": "explicit" }, "editor.defaultFormatter": "ms-python.black-formatter", "editor.formatOnSave": true, @@ -170,5 +170,11 @@ "python.testing.unittestEnabled": false, "debugpy.debugJustMyCode": false, "python.analysis.autoImportCompletions": true, - "python.analysis.supportRestructuredText": true + "python.analysis.supportRestructuredText": true, + "makefile.configureOnOpen": false, + + "githubPullRequests.experimental.chat": true, + "github.copilot.chat.codesearch.enabled": true, + "github.copilot.chat.copilotDebugCommand.enabled": true, + "github.copilot.chat.reviewSelection.enabled": true } diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ef9ab4fdea6..27121a96ce5 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -97,10 +97,12 @@ We expect development to follow a typical "forking" style workflow: Some notes on organizing changes to help reviewers: - 1. Please try to keep PRs small whenver possible and don't include unnecessaary formatting changes. + 1. Please try to keep PRs small whenever possible and don't include unnecessary formatting changes. 1. Larger changes can be planned in [Issues](https://github.com/microsoft/MLOS/issues), prototyped in a large draft PR for early feedback, and split into smaller PRs via discussion. 1. All changes should include test coverage (either new or existing). + > For additional advice on PR reviews, see [.github/instructions/](.github/instructions/) for Copilot instructions. + 1. PRs are associated with [Github Issues](https://github.com/microsoft/MLOS/issues) and need [MLOS-committers](https://github.com/orgs/microsoft/teams/MLOS-committers) to sign-off (in addition to other CI pipeline checks like tests and lint checks to pass). 1. Once approved, the PR can be completed using a squash merge in order to keep a nice linear history. diff --git a/mlos_bench/mlos_bench/config/schedulers/parallel_scheduler.jsonc b/mlos_bench/mlos_bench/config/schedulers/parallel_scheduler.jsonc new file mode 100644 index 00000000000..4d6b7a7e272 --- /dev/null +++ b/mlos_bench/mlos_bench/config/schedulers/parallel_scheduler.jsonc @@ -0,0 +1,12 @@ +// Mock optimizer to test the benchmarking framework. +{ + "$schema": "https://raw.githubusercontent.com/microsoft/MLOS/main/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json", + + "class": "mlos_bench.schedulers.ParallelScheduler", + + "config": { + "trial_config_repeat_count": 3, + "max_trials": -1, // Limited only in the Optimizer logic/config. + "teardown": false + } +} diff --git a/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json b/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json index 81b2e797547..dedac1ed758 100644 --- a/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json +++ b/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json @@ -2,12 +2,10 @@ "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://raw.githubusercontent.com/microsoft/MLOS/main/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json", "title": "mlos_bench Scheduler config", - "$defs": { "comment": { "$comment": "This section contains reusable partial schema bits (or just split out for readability)" }, - "config_base_scheduler": { "$comment": "config properties common to all Scheduler types.", "type": "object", @@ -29,18 +27,23 @@ "description": "Max. number of trials to run. Use -1 or 0 for unlimited.", "type": "integer", "minimum": -1, - "examples": [50, -1] + "examples": [ + 50, + -1 + ] }, "trial_config_repeat_count": { "description": "Number of times to repeat a config.", "type": "integer", "minimum": 1, - "examples": [3, 5] + "examples": [ + 3, + 5 + ] } } } }, - "description": "config for the mlos_bench scheduler", "$comment": "top level schema document rules", "type": "object", @@ -51,21 +54,20 @@ "$comment": "This is optional, but if provided, should match the name of this file.", "pattern": "/schemas/schedulers/scheduler-schema.json$" }, - "description": { "description": "Optional description of the config.", "type": "string" }, - "class": { "description": "The name of the scheduler class to use.", "$comment": "required", "enum": [ "mlos_bench.schedulers.SyncScheduler", - "mlos_bench.schedulers.sync_scheduler.SyncScheduler" + "mlos_bench.schedulers.sync_scheduler.SyncScheduler", + "mlos_bench.schedulers.ParallelScheduler", + "mlos_bench.schedulers.parallel_scheduler.ParallelScheduler" ] }, - "config": { "description": "The scheduler-specific config.", "$comment": "Stub for scheduler-specific config appended with condition statements below", @@ -73,8 +75,9 @@ "minProperties": 1 } }, - "required": ["class"], - + "required": [ + "class" + ], "oneOf": [ { "$comment": "extensions to the 'config' object properties when synchronous scheduler is being used", @@ -83,17 +86,25 @@ "class": { "enum": [ "mlos_bench.schedulers.SyncScheduler", - "mlos_bench.schedulers.sync_scheduler.SyncScheduler" + "mlos_bench.schedulers.sync_scheduler.SyncScheduler", + "mlos_bench.schedulers.ParallelScheduler", + "mlos_bench.schedulers.parallel_scheduler.ParallelScheduler" ] } }, - "required": ["class"] + "required": [ + "class" + ] }, "then": { "properties": { "config": { "type": "object", - "allOf": [{ "$ref": "#/$defs/config_base_scheduler" }], + "allOf": [ + { + "$ref": "#/$defs/config_base_scheduler" + } + ], "$comment": "disallow other properties", "unevaluatedProperties": false } diff --git a/mlos_bench/mlos_bench/event_loop_context.py b/mlos_bench/mlos_bench/event_loop_context.py index d071c629912..4aaa74e64ee 100644 --- a/mlos_bench/mlos_bench/event_loop_context.py +++ b/mlos_bench/mlos_bench/event_loop_context.py @@ -39,6 +39,7 @@ class EventLoopContext: def __init__(self) -> None: self._event_loop: AbstractEventLoop | None = None self._event_loop_thread: Thread | None = None + # TODO: Check if we can fork the ThreadLock or need to delay using that. self._event_loop_thread_lock = ThreadLock() self._event_loop_thread_refcnt: int = 0 diff --git a/mlos_bench/mlos_bench/schedulers/__init__.py b/mlos_bench/mlos_bench/schedulers/__init__.py index 381261e53da..fd381612be7 100644 --- a/mlos_bench/mlos_bench/schedulers/__init__.py +++ b/mlos_bench/mlos_bench/schedulers/__init__.py @@ -5,9 +5,11 @@ """Interfaces and implementations of the optimization loop scheduling policies.""" from mlos_bench.schedulers.base_scheduler import Scheduler +from mlos_bench.schedulers.parallel_scheduler import ParallelScheduler from mlos_bench.schedulers.sync_scheduler import SyncScheduler __all__ = [ "Scheduler", "SyncScheduler", + "ParallelScheduler", ] diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py index b711388609d..11db5a88bdb 100644 --- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py @@ -28,6 +28,7 @@ class Scheduler(ContextManager, metaclass=ABCMeta): # pylint: disable=too-many-instance-attributes + # pylint: disable=too-many-public-methods """Base class for the optimization loop scheduling policies.""" def __init__( # pylint: disable=too-many-arguments @@ -100,8 +101,9 @@ def __init__( # pylint: disable=too-many-arguments self._optimizer = optimizer self._storage = storage self._root_env_config = root_env_config - self._last_trial_id = -1 + self._longest_finished_trial_sequence_id = -1 self._ran_trials: list[Storage.Trial] = [] + self._registered_trial_ids: set[int] = set() _LOG.debug("Scheduler instantiated: %s :: %s", self, config) @@ -242,7 +244,6 @@ def __exit__( self._in_context = False return False # Do not suppress exceptions - @abstractmethod def start(self) -> None: """Start the scheduling loop.""" assert self.experiment is not None @@ -257,13 +258,55 @@ def start(self) -> None: if self._config_id > 0: tunables = self.load_tunable_config(self._config_id) - self.schedule_trial(tunables) + # If a config_id is provided, assume it is expected to be run immediately. + self.add_trial_to_queue(tunables, ts_start=datetime.now(UTC)) + + is_warm_up: bool = self.optimizer.supports_preload + if not is_warm_up: + _LOG.warning("Skip pending trials and warm-up: %s", self.optimizer) + + not_done: bool = True + while not_done: + _LOG.info( + "Optimization loop: Longest finished trial sequence ID: %d", + self._longest_finished_trial_sequence_id, + ) + self.run_schedule(is_warm_up) + self.wait_for_trial_runners() + not_done = self.add_new_optimizer_suggestions() + self.assign_trial_runners( + self.experiment.pending_trials( + datetime.now(UTC), + running=False, + trial_runner_assigned=False, + ) + ) + is_warm_up = False + self.wait_for_trial_runners(wait_all=True) + + @abstractmethod + def wait_for_trial_runners(self, wait_all: bool = False) -> None: + """ + Wait for (enough) TrialRunners to finish. + + This is a blocking call that waits for enough of the the TrialRunners to finish. + The base class implementation waits for all of the TrialRunners to finish. + However this can be overridden in subclasses to implement a more asynchronous behavior. + + Parameters + ---------- + wait_all : bool + If True, wait for all TrialRunners to finish. + If False, wait for "enough" TrialRunners to finish (which for the + base class is all of them). + """ def teardown(self) -> None: """ Tear down the TrialRunners/Environment(s). - Call it after the completion of the `.start()` in the scheduler context. + Call it after the completion of the :py:meth:`Scheduler.start` in the + Scheduler context. """ assert self.experiment is not None if self._do_teardown: @@ -290,54 +333,112 @@ def load_tunable_config(self, config_id: int) -> TunableGroups: _LOG.debug("Config %d ::\n%s", config_id, json.dumps(tunable_values, indent=2)) return tunables.copy() - def _schedule_new_optimizer_suggestions(self) -> bool: + def add_new_optimizer_suggestions(self) -> bool: """ Optimizer part of the loop. - Load the results of the executed trials into the optimizer, suggest new - configurations, and add them to the queue. Return True if optimization is not - over, False otherwise. + Load the results of the executed trials into the + :py:class:`~.Optimizer`, suggest new configurations, and add them to the + queue. + + Returns + ------- + bool + The return value indicates whether the optimization process should + continue to get suggestions from the Optimizer or not. + See Also: :py:meth:`~.Scheduler.not_done`. """ assert self.experiment is not None - (trial_ids, configs, scores, status) = self.experiment.load(self._last_trial_id) + # Load the results of the trials that have been run since the last time + # we queried the Optimizer. + # Note: We need to handle the case of straggler trials that finish out of order. + (trial_ids, configs, scores, status) = self.experiment.load( + last_trial_id=self._longest_finished_trial_sequence_id, + omit_registered_trial_ids=self._registered_trial_ids, + ) _LOG.info("QUEUE: Update the optimizer with trial results: %s", trial_ids) self.optimizer.bulk_register(configs, scores, status) - self._last_trial_id = max(trial_ids, default=self._last_trial_id) + # Mark those trials as registered so we don't load them again. + self._registered_trial_ids.update(trial_ids) + # Update the longest finished trial sequence ID. + self._longest_finished_trial_sequence_id = max( + [ + self.experiment.get_longest_prefix_finished_trial_id(), + self._longest_finished_trial_sequence_id, + ], + default=self._longest_finished_trial_sequence_id, + ) + # Remove trial ids that are older than the longest finished trial sequence ID. + # This is an optimization to avoid a long list of trial ids to omit from + # the load() operation or a long list of trial ids to maintain in memory. + self._registered_trial_ids = { + trial_id + for trial_id in self._registered_trial_ids + if trial_id > self._longest_finished_trial_sequence_id + } + # Check if the optimizer has converged or not. not_done = self.not_done() if not_done: + # TODO: Allow scheduling multiple configs at once (e.g., in the case of idle workers). tunables = self.optimizer.suggest() - self.schedule_trial(tunables) - + self.add_trial_to_queue(tunables) return not_done - def schedule_trial(self, tunables: TunableGroups) -> None: - """Add a configuration to the queue of trials.""" - # TODO: Alternative scheduling policies may prefer to expand repeats over - # time as well as space, or adjust the number of repeats (budget) of a given - # trial based on whether initial results are promising. + def add_trial_to_queue( + self, + tunables: TunableGroups, + ts_start: datetime | None = None, + ) -> None: + """ + Add a configuration to the queue of trials 1 or more times. + + (e.g., according to the :py:attr:`~.Scheduler.trial_config_repeat_count`) + + Parameters + ---------- + tunables : TunableGroups + The tunable configuration to add to the queue. + + ts_start : datetime | None + Optional timestamp to use to start the trial. + + Notes + ----- + Alternative scheduling policies may prefer to expand repeats over + time as well as space, or adjust the number of repeats (budget) of a given + trial based on whether initial results are promising. + """ for repeat_i in range(1, self._trial_config_repeat_count + 1): self._add_trial_to_queue( tunables, - config={ - # Add some additional metadata to track for the trial such as the - # optimizer config used. - # Note: these values are unfortunately mutable at the moment. - # Consider them as hints of what the config was the trial *started*. - # It is possible that the experiment configs were changed - # between resuming the experiment (since that is not currently - # prevented). - "optimizer": self.optimizer.name, - "repeat_i": repeat_i, - "is_defaults": tunables.is_defaults(), - **{ - f"opt_{key}_{i}": val - for (i, opt_target) in enumerate(self.optimizer.targets.items()) - for (key, val) in zip(["target", "direction"], opt_target) - }, - }, + ts_start=ts_start, + config=self._augment_trial_config_metadata(tunables, repeat_i), ) + def _augment_trial_config_metadata( + self, + tunables: TunableGroups, + repeat_i: int, + ) -> dict[str, Any]: + return { + # Add some additional metadata to track for the trial such as the + # optimizer config used. + # Note: these values are unfortunately mutable at the moment. + # Consider them as hints of what the config was the trial *started*. + # It is possible that the experiment configs were changed + # between resuming the experiment (since that is not currently + # prevented). + "optimizer": self.optimizer.name, + "repeat_i": repeat_i, + "is_defaults": tunables.is_defaults(), + **{ + f"opt_{key}_{i}": val + for (i, opt_target) in enumerate(self.optimizer.targets.items()) + for (key, val) in zip(["target", "direction"], opt_target) + }, + } + def _add_trial_to_queue( self, tunables: TunableGroups, @@ -355,10 +456,10 @@ def _add_trial_to_queue( def assign_trial_runners(self, trials: Iterable[Storage.Trial]) -> None: """ - Assigns TrialRunners to the given Trial in batch. + Assigns :py:class:`~.TrialRunner`s to the given :py:class:`~.Trial`s in batch. - The base class implements a simple round-robin scheduling algorithm for each - Trial in sequence. + The base class implements a simple round-robin scheduling algorithm for + each Trial in sequence. Subclasses can override this method to implement a more sophisticated policy. For instance:: @@ -378,6 +479,11 @@ def assign_trial_runners( trial.set_trial_runner(trial_runner) ... + Notes + ----- + Subclasses are *not* required to assign a TrialRunner to the Trial + (e.g., if the Trial should be deferred to a later time). + Parameters ---------- trials : Iterable[Storage.Trial] @@ -414,7 +520,8 @@ def assign_trial_runners( def get_trial_runner(self, trial: Storage.Trial) -> TrialRunner: """ - Gets the TrialRunner associated with the given Trial. + Gets the :py:class:`~.TrialRunner` associated with the given + :py:class:`~.Storage.Trial`. Parameters ---------- @@ -425,7 +532,11 @@ def get_trial_runner(self, trial: Storage.Trial) -> TrialRunner: ------- TrialRunner """ + # FIXME: May need to improve handling here in the case of + # assign_trial_runners doesn't assign a TrialRunner to a particular + # Trial for some reason. if trial.trial_runner_id is None: + # TODO: Maybe we can force it here? self.assign_trial_runners([trial]) assert trial.trial_runner_id is not None trial_runner = self._trial_runners.get(trial.trial_runner_id) @@ -437,25 +548,30 @@ def get_trial_runner(self, trial: Storage.Trial) -> TrialRunner: assert trial_runner.trial_runner_id == trial.trial_runner_id return trial_runner - def _run_schedule(self, running: bool = False) -> None: + def run_schedule(self, running: bool = False) -> None: """ - Scheduler part of the loop. + Runs the current schedule of trials. - Check for pending trials in the queue and run them. + Check for :py:class:`.Trial`s with `:py:attr:`.Status.PENDING` and an + assigned :py:attr:`~.Trial.trial_runner_id` in the queue and run them + with :py:meth:`~.Scheduler.run_trial`. """ assert self.experiment is not None - # Make sure that any pending trials have a TrialRunner assigned. pending_trials = list(self.experiment.pending_trials(datetime.now(UTC), running=running)) - self.assign_trial_runners(pending_trials) for trial in pending_trials: + if trial.trial_runner_id is None: + logging.warning("Trial %s has no TrialRunner assigned yet.") + continue self.run_trial(trial) def not_done(self) -> bool: """ Check the stopping conditions. - By default, stop when the optimizer converges or max limit of trials reached. + By default, stop when the :py:class:`.Optimizer` converges or the limit + of :py:attr:`~.Scheduler.max_trials` is reached. """ + # TODO: Add more stopping conditions: https://github.com/microsoft/MLOS/issues/427 return self.optimizer.not_converged() and ( self._trial_count < self._max_trials or self._max_trials <= 0 ) diff --git a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py new file mode 100644 index 00000000000..1faf6fbb136 --- /dev/null +++ b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py @@ -0,0 +1,512 @@ +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# +""" +A simple multi-process asynchronous optimization loop implementation. + +TODO: Add more details about the design and constraints and gotchas here. + +Examples +-------- +TODO: Add config examples here. +""" + +import logging +from collections.abc import Callable, Iterable +from datetime import datetime +from multiprocessing import current_process +from multiprocessing.pool import AsyncResult, Pool +from time import sleep +from typing import Any + +from attr import dataclass +from pytz import UTC + +from mlos_bench.environments.status import Status +from mlos_bench.optimizers.base_optimizer import Optimizer +from mlos_bench.schedulers.base_scheduler import Scheduler +from mlos_bench.schedulers.trial_runner import TrialRunner +from mlos_bench.storage.base_storage import Storage +from mlos_bench.tunables.tunable_types import TunableValue + +_LOG = logging.getLogger(__name__) + + +MAIN_PROCESS_NAME = "MainProcess" +"""Name of the main process in control of the +:external:py:class:`multiprocessing.Pool`. +""" + + +def is_child_process() -> bool: + """Check if the current process is a child process.""" + return current_process().name != MAIN_PROCESS_NAME + + +@dataclass +class TrialRunnerResult: + """A simple data class to hold the :py:class:`AsyncResult` of a + :py:class:`TrialRunner` operation. + """ + + trial_runner_id: int + results: dict[str, TunableValue] | None + timestamp: datetime | None = None + status: Status | None = None + trial_id: int | None = None + + +class ParallelScheduler(Scheduler): + """ + A simple multi-process asynchronous optimization loop implementation. + + See :py:mod:`mlos_bench.schedulers.parallel_scheduler` for more usage details. + + Notes + ----- + This schedule uses :ext:py:class:`multiprocessing.Pool` to run + :py:class:`~.Storage.Trial`s in parallel. + + To avoid issues with Python's forking implementation, which relies on pickling + objects and functions from the main process and sending them to the child + process to invoke, we need to avoid incompatible objects, which includes any + additional threads (e.g., :py:mod:`asyncio` tasks such as + :py:class:`mlos_bench.event_loop_context.EventLoopContext`), database + connections (e.g., :py:mod:`mlos_bench.storage`), and file handles (e.g., + :ext:py:mod:`logging`) that are pickle incompatible. + + To accomplish this, we avoid entering the :py:class:`~.TrialRunner` context + until we are in the child process and allow each child to manage its own + incompatible resources via that context. + + Hence, each child process in the pool actually starts in functions in + special handler functions in the :py:class:`~.ParallelScheduler` class that + receive as inputs all the necessary (and picklable) info as arguments, then + enter the given :py:class:`~.TrialRunner` instance context and invoke that + procedure. + + For instance :py:meth:`~.ParallelScheduler._teardown_trial_runner` is a function that + """ + + def __init__( # pylint: disable=too-many-arguments + self, + *, + config: dict[str, Any], + global_config: dict[str, Any], + trial_runners: Iterable[TrialRunner], + optimizer: Optimizer, + storage: Storage, + root_env_config: str, + ): + super().__init__( + config=config, + global_config=global_config, + trial_runners=trial_runners, + optimizer=optimizer, + storage=storage, + root_env_config=root_env_config, + ) + + # TODO: Add schema support for this config. + self._idle_worker_scheduling_batch_size = int( + # By default wait for 1 idle workers before scheduling new trials. + config.get("idle_worker_scheduling_batch_size", 1) + ) + # Never wait for more than the number of trial runners. + self._idle_worker_scheduling_batch_size = min( + self._idle_worker_scheduling_batch_size, + len(self._trial_runners), + ) + if self._idle_worker_scheduling_batch_size < 1: + _LOG.warning( + "Idle worker scheduling is set to %d, which is less than 1. " + f"Setting it to number of TrialRunners {len(self._trial_runners)}.", + self._idle_worker_scheduling_batch_size, + ) + self._idle_worker_scheduling_batch_size = len(self._trial_runners) + + # TODO: Add schema support for this config. + self._polling_interval = float(config.get("polling_interval", 1.0)) + + # TODO: Setup logging for the child processes via a logging queue. + + self._pool: Pool | None = None + """ + Parallel :external:py:class:`.Pool` to run :py:class:`~.Storage.Trial`s in + separate :py:class:`.TrialRunner` processes. + + Only initiated on context :py:meth:`.__enter__`. + """ + + self._trial_runners_status: dict[int, AsyncResult[TrialRunnerResult] | None] = { + trial_runner.trial_runner_id: None for trial_runner in self._trial_runners.values() + } + """ + A dict to keep track of the status of each :py:class:`.TrialRunner`. + + Since TrialRunners enter their running context within each pool task, we + can't check :py:meth:`.TrialRunner.is_running` within the parent + generally. + + Instead, we use a dict to keep track of the status of each TrialRunner + as either None (idle) or AsyncResult (running). + + This also helps us to gather AsyncResults from each worker. + """ + + @property + def idle_worker_scheduling_batch_size(self) -> int: + """ + Get the batch size for idle worker scheduling. + + This is the number of idle workers to wait for before scheduling new trials. + """ + return self._idle_worker_scheduling_batch_size + + def _get_idle_trial_runners_count(self) -> int: + """ + Return a count of idle trial runners. + + Can be used as a hint for the number of new trials we can run when we next get + more suggestions from the Optimizer. + """ + return len( + [ + trial_runner_status + for trial_runner_status in self._trial_runners_status.values() + if trial_runner_status is None + ] + ) + + def _has_running_trial_runners(self) -> bool: + """Check to see if any TrialRunners are currently busy.""" + return any( + True + for trial_runner_status in self._trial_runners_status.values() + if trial_runner_status is not None + ) + + def __enter__(self): + # Setup the process pool to run the trials in parallel. + self._pool = Pool(processes=len(self.trial_runners), maxtasksperchild=1) + self._pool.__enter__() + # Delay context entry in the parent process + return super().__enter__() + + def __exit__(self, ex_type, ex_val, ex_tb): + assert self._pool is not None + # Shutdown the process pool and wait for all tasks to finish + # (everything should be done by now anyways) + assert not self._has_running_trial_runners() + assert self._get_idle_trial_runners_count() == len(self._trial_runners) + self._pool.close() + self._pool.join() + self._pool.__exit__(ex_type, ex_val, ex_tb) + return super().__exit__(ex_type, ex_val, ex_tb) + + @staticmethod + def run_trial_on_trial_runner( + storage: Storage, + experiment_id: str, + trial_id: int, + trial_runner: TrialRunner, + global_config: dict[str, Any] | None, + ) -> TrialRunnerResult: + """ + Retrieve and run a :py:class:`~.Storage.Trial` on a specific + :py:class:`.TrialRunner` in a :py:class:`~.Pool` background worker process. + + Parameters + ---------- + storage : Storage + The :py:class:`~.Storage` to use to retrieve the :py:class:`.Storage.Trial`. + experiment_id : str + The ID of the experiment the trial is a part of. + trial_id : int + The ID of the trial. + trial_runner : TrialRunner + The :py:class:`.TrialRunner` to run on. + global_config : dict[str, Any] | None + The global configuration to use for the trial. + + Returns + ------- + TrialRunnerResult + The result of the :py:meth:`.TrialRunner.run_trial` operation. + + Notes + ----- + This is called in the Pool worker process, so it must receive arguments + that are picklable and be able to construct all necessary state from that. + Upon completion a callback is used to update the status of the + TrialRunner in the ParallelScheduler with the value in the + TrialRunnerResult. + """ + assert is_child_process(), "This should be called in a Pool worker." + exp = storage.get_experiment_by_id(experiment_id) + assert exp is not None, "Experiment not found." + trial = exp.get_trial_by_id(trial_id) + assert trial is not None, "Trial not found." + assert ( + trial.trial_runner_id == trial_runner.trial_runner_id + ), f"Unexpected Trial Runner {trial_runner} for Trial {trial}." + with trial_runner: + (status, ts, results) = trial_runner.run_trial(trial, global_config) + return TrialRunnerResult( + trial_runner_id=trial_runner.trial_runner_id, + results=results, + timestamp=ts, + status=status, + trial_id=trial.trial_id, + ) + + def _run_trial_on_trial_runner_finished_callback( + self, + result: TrialRunnerResult, + ) -> None: + """Callback to be called when a TrialRunner is finished with run_trial.""" + trial_runner_id = result.trial_runner_id + assert ( + trial_runner_id in self._trial_runners_status + ), f"Unexpected TrialRunner {trial_runner_id}." + assert ( + self._trial_runners_status[trial_runner_id] is not None + ), f"TrialRunner {trial_runner_id} should have been running." + # Mark the TrialRunner as finished. + self._trial_runners_status[result.trial_runner_id] = None + # TODO: save the results? + + def _run_trial_on_trial_runner_failed_closure( + self, + trial_runner_id: int, + ) -> Callable[[Any], None]: + # pylint: disable=no-self-use + """Callback to be called when a TrialRunner failed running run_trial.""" + + def _run_trial_on_trial_runner_failed(obj: Any) -> None: + """Callback to be called when a TrialRunner failed running run_trial.""" + # TODO: improve error handling here + _LOG.error("TrialRunner %d failed on run_trial: %s", trial_runner_id, obj) + raise RuntimeError(f"TrialRunner {trial_runner_id} failed on run_trial: {obj}") + + return _run_trial_on_trial_runner_failed + + def run_trial(self, trial: Storage.Trial) -> None: + """ + Set up and run a single Trial on a TrialRunner in a child process in the pool. + + The TrialRunner saves the results in the Storage. + """ + assert self._pool is not None + assert self._in_context + assert not is_child_process(), "This should be called in the parent process." + + # Run the given trial in the child process targeting a particular runner. + trial_runner_id = trial.trial_runner_id + assert trial_runner_id is not None, f"Trial {trial} has not been assigned a trial runner." + trial_runner = self._trial_runners[trial_runner_id] + + if self._trial_runners_status[trial_runner_id] is not None: + _LOG.info("TrialRunner %s is still active. Skipping trial %s.", trial_runner, trial) + + # Update the scheduler's trial bookkeeping. + super().run_trial(trial) + # Start the trial in a child process. + self._trial_runners_status[trial_runner_id] = self._pool.apply_async( + # Call the teardown function in the child process targeting + # a particular trial_runner. + self.run_trial_on_trial_runner, + args=( + self.storage, + self._experiment_id, + trial.trial_id, + trial_runner, + self.global_config, + ), + callback=self._run_trial_on_trial_runner_finished_callback, + error_callback=self._run_trial_on_trial_runner_failed_closure(trial_runner_id), + ) + + def run_schedule(self, running: bool = False) -> None: + """ + Runs the current schedule of Trials on parallel background workers. + + Check for :py:class:`.Trial`s with `:py:attr:`.Status.PENDING` and an + assigned :py:attr:`~.Trial.trial_runner_id` in the queue and run them + with :py:meth:`~.Scheduler.run_trial`. + """ + + assert not is_child_process(), "This should be called in the parent process." + assert self._pool is not None + assert self._experiment is not None + + scheduled_trials = self._experiment.pending_trials( + datetime.now(UTC), + running=running, + trial_runner_assigned=True, + ) + scheduled_trials = [ + trial + for trial in scheduled_trials + if trial.trial_runner_id is not None and trial.trial_runner_id >= 0 + ] + + # Start each of the scheduled trials in the background. + for trial in scheduled_trials: + self.run_trial(trial) + # Now all available trial should be started in the background. + # We can move on to wait_trial_runners() to wait for some to finish. + + def wait_for_trial_runners(self, wait_all: bool = False) -> None: + """ + Wait for all :py:class:`.TrialRunner`s to finish running. + + This is a blocking call that will wait for all trial runners to finish + running before returning. + + Parameters + ---------- + wait_all : bool + If True, wait for all trial runners to finish. If False, wait for + :py:attr:`~.TrialRunner.idle_worker_scheduling_batch_size` number of + idle trial runners to finish. Default is False. + + Notes + ----- + This is called in the parent process, so it must not block the main + thread. + """ + assert not is_child_process(), "This should be called in the parent process." + if wait_all: + # Wait for all trial runners to finish. + _LOG.info("Waiting for all trial runners to finish.") + while self._has_running_trial_runners(): + sleep(self._polling_interval) + assert not self._has_running_trial_runners(), "All trial runners should be idle." + else: + # Wait for a batch of idle trial runners to finish. + _LOG.info( + "Waiting for %d idle trial runners to finish.", + self._idle_worker_scheduling_batch_size, + ) + while self._get_idle_trial_runners_count() < self._idle_worker_scheduling_batch_size: + sleep(self._polling_interval) + assert self._get_idle_trial_runners_count() >= self._idle_worker_scheduling_batch_size + + @staticmethod + def teardown_trial_runner(trial_runner: TrialRunner) -> TrialRunnerResult: + """ + Tear down a specific :py:class:`.TrialRunner` (and its + :py:class:`~mlos_bench.environments.base_environment.Environment`) in a + :py:class:`.Pool` worker. + + Parameters + ---------- + trial_runner : TrialRunner + The :py:class:`.TrialRunner` to tear down. + + Returns + ------- + TrialRunnerResult + The result of the teardown operation, including the trial_runner_id + and the result of the teardown operation. + + Notes + ----- + This is called in the Pool worker process, so it must receive arguments + that are picklable. + To keep life simple we pass the entire TrialRunner object, which should + **not** be have entered its context (else it may have non-picklable + state), and make this a static method of the class to avoid needing to + pass the :py:class:`~.ParallelScheduler` instance. + Upon completion a callback is used to update the status of the + TrialRunner in the ParallelScheduler with the value in the + TrialRunnerResult. + """ + assert is_child_process(), "This should be called in a Pool worker." + with trial_runner: + return TrialRunnerResult( + trial_runner_id=trial_runner.trial_runner_id, + results=trial_runner.teardown(), + ) + + def _teardown_trial_runner_finished_callback(self, result: TrialRunnerResult) -> None: + """Callback to be called when a TrialRunner is finished with teardown.""" + assert not is_child_process(), "This should be called in the parent process." + trial_runner_id = result.trial_runner_id + assert ( + trial_runner_id in self._trial_runners_status + ), f"Unexpected TrialRunner {trial_runner_id}." + assert ( + self._trial_runners_status[trial_runner_id] is not None + ), f"TrialRunner {trial_runner_id} should have been running." + self._trial_runners_status[result.trial_runner_id] = None + # Nothing to do with the result. + + @staticmethod + def _teardown_trial_runner_failed_closure(trial_runner_id: int) -> Callable[[Any], None]: + """Callback to be called when a TrialRunner failed running teardown.""" + + def _teardown_trial_runner_failed(obj: Any) -> None: + """Callback to be called when a TrialRunner failed running teardown.""" + assert not is_child_process(), "This should be called in the parent process." + # TODO: improve error handling here + _LOG.error("TrialRunner %d failed to run teardown: %s", trial_runner_id, obj) + raise RuntimeError(f"TrialRunner {trial_runner_id} failed to run teardown: {obj}") + + return _teardown_trial_runner_failed + + def teardown(self) -> None: + assert not is_child_process(), "This should be called in the parent process." + assert self._pool is not None + assert self._in_context + assert not self._has_running_trial_runners(), "All trial runners should be idle." + if self._do_teardown: + # Call teardown on each TrialRunner in the pool in parallel. + for trial_runner_id, trial_runner in self._trial_runners.items(): + assert ( + self._trial_runners_status[trial_runner_id] is None + ), f"TrialRunner {trial_runner} is still active." + self._trial_runners_status[trial_runner_id] = self._pool.apply_async( + # Call the teardown function in the child process targeting + # a particular trial_runner. + self.teardown_trial_runner, + args=(trial_runner,), + callback=self._teardown_trial_runner_finished_callback, + error_callback=self._teardown_trial_runner_failed_closure(trial_runner_id), + ) + + # Wait for all trial runners to finish. + while self._has_running_trial_runners(): + sleep(self._polling_interval) + assert not self._has_running_trial_runners(), "All trial runners should be idle." + + def assign_trial_runners(self, trials: Iterable[Storage.Trial]) -> None: + """ + Assign :py:class:`~.Storage.Trial`s to the first available and idle + :py:class:`.TrialRunner`. + + Parameters + ---------- + trials : Iterable[Storage.Trial] + """ + assert self._in_context + assert self.experiment is not None + assert not is_child_process(), "This should be called in the parent process." + + scheduleable_trials: list[Storage.Trial] = list( + trial + for trial in trials + if trial.status.is_pending() and trial.trial_runner_id is None + ) + + idle_runner_ids = [ + trial_runner_id + for trial_runner_id, status in self._trial_runners_status.items() + if status is None + ] + + # Assign pending trials to idle runners + for trial, runner_id in zip(scheduleable_trials, idle_runner_ids): + trial.set_trial_runner(runner_id) diff --git a/mlos_bench/mlos_bench/schedulers/sync_scheduler.py b/mlos_bench/mlos_bench/schedulers/sync_scheduler.py index f450b28b8f1..324841e45d6 100644 --- a/mlos_bench/mlos_bench/schedulers/sync_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/sync_scheduler.py @@ -15,30 +15,26 @@ class SyncScheduler(Scheduler): """A simple single-threaded synchronous optimization loop implementation.""" - def start(self) -> None: - """Start the optimization loop.""" - super().start() - - is_warm_up = self.optimizer.supports_preload - if not is_warm_up: - _LOG.warning("Skip pending trials and warm-up: %s", self.optimizer) - - not_done = True - while not_done: - _LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id) - self._run_schedule(is_warm_up) - not_done = self._schedule_new_optimizer_suggestions() - is_warm_up = False - def run_trial(self, trial: Storage.Trial) -> None: """ - Set up and run a single trial. + Set up and run a single :py:class:`~.Storage.Trial` on its + :py:class:`~.TrialRunner`. Save the results in the storage. """ super().run_trial(trial) # In the sync scheduler we run each trial on its own TrialRunner in sequence. trial_runner = self.get_trial_runner(trial) + if trial_runner is None: + _LOG.warning("No trial runner found for %s", trial) + return with trial_runner: trial_runner.run_trial(trial, self.global_config) _LOG.info("QUEUE: Finished trial: %s on %s", trial, trial_runner) + + def wait_for_trial_runners(self, wait_all: bool = False) -> None: + # The default base implementation of wait_for_trial_runners() is a no-op + # because trial_runner.run_trial() is blocking so SyncScheduler only + # runs a single trial at a time. + # pylint: disable=useless-super-delegation + super().wait_for_trial_runners() diff --git a/mlos_bench/mlos_bench/schedulers/trial_runner.py b/mlos_bench/mlos_bench/schedulers/trial_runner.py index 80eb696bc6d..2f5ccdfda14 100644 --- a/mlos_bench/mlos_bench/schedulers/trial_runner.py +++ b/mlos_bench/mlos_bench/schedulers/trial_runner.py @@ -20,6 +20,7 @@ from mlos_bench.services.types import SupportsConfigLoading from mlos_bench.storage.base_storage import Storage from mlos_bench.tunables.tunable_groups import TunableGroups +from mlos_bench.tunables.tunable_types import TunableValue _LOG = logging.getLogger(__name__) @@ -117,6 +118,8 @@ def __init__(self, trial_runner_id: int, env: Environment) -> None: assert self._env.parameters["trial_runner_id"] == self._trial_runner_id self._in_context = False self._is_running = False + # TODO: Check and see if we need to delay creating the event loop + # context until context entry. self._event_loop_context = EventLoopContext() def __repr__(self) -> str: @@ -168,7 +171,7 @@ def run_trial( self, trial: Storage.Trial, global_config: dict[str, Any] | None = None, - ) -> None: + ) -> tuple[Status, datetime, dict[str, TunableValue] | None]: """ Run a single trial on this TrialRunner's Environment and stores the results in the backend Trial Storage. @@ -198,9 +201,10 @@ def run_trial( if not self.environment.setup(trial.tunables, trial.config(global_config)): _LOG.warning("Setup failed: %s :: %s", self.environment, trial.tunables) # FIXME: Use the actual timestamp from the environment. - _LOG.info("TrialRunner: Update trial results: %s :: %s", trial, Status.FAILED) - trial.update(Status.FAILED, datetime.now(UTC)) - return + (status, timestamp, results) = (Status.FAILED, datetime.now(UTC), None) + _LOG.info("TrialRunner: Update trial results: %s :: %s", trial, status) + trial.update(status, timestamp) + return (status, timestamp, results) # TODO: start background status polling of the environments in the event loop. @@ -221,6 +225,8 @@ def run_trial( self._is_running = False + return (status, timestamp, results) + def teardown(self) -> None: """ Tear down the Environment. diff --git a/mlos_bench/mlos_bench/storage/base_storage.py b/mlos_bench/mlos_bench/storage/base_storage.py index f2d393994f7..da47d839634 100644 --- a/mlos_bench/mlos_bench/storage/base_storage.py +++ b/mlos_bench/mlos_bench/storage/base_storage.py @@ -22,9 +22,11 @@ Base interface for accessing the stored benchmark trial data. """ +from __future__ import annotations + import logging from abc import ABCMeta, abstractmethod -from collections.abc import Iterator, Mapping +from collections.abc import Iterable, Iterator, Mapping from contextlib import AbstractContextManager as ContextManager from datetime import datetime from types import TracebackType @@ -94,6 +96,25 @@ def experiments(self) -> dict[str, ExperimentData]: A dictionary of the experiments' data, keyed by experiment id. """ + @abstractmethod + def get_experiment_by_id( + self, + experiment_id: str, + ) -> Storage.Experiment | None: + """ + Gets an Experiment by its ID. + + Parameters + ---------- + experiment_id : str + ID of the Experiment to retrieve. + + Returns + ------- + experiment : Storage.Experiment | None + The Experiment object, or None if it doesn't exist. + """ + @abstractmethod def experiment( # pylint: disable=too-many-arguments self, @@ -104,7 +125,7 @@ def experiment( # pylint: disable=too-many-arguments description: str, tunables: TunableGroups, opt_targets: dict[str, Literal["min", "max"]], - ) -> "Storage.Experiment": + ) -> Storage.Experiment: """ Create a new experiment in the storage. @@ -161,7 +182,7 @@ def __init__( # pylint: disable=too-many-arguments self._opt_targets = opt_targets self._in_context = False - def __enter__(self) -> "Storage.Experiment": + def __enter__(self) -> Storage.Experiment: """ Enter the context of the experiment. @@ -284,10 +305,20 @@ def load_telemetry(self, trial_id: int) -> list[tuple[datetime, str, Any]]: Telemetry data. """ + @abstractmethod + def get_longest_prefix_finished_trial_id(self) -> int: + """ + Calculate the last trial ID for the experiment. + + This is used to determine the last trial ID that finished (failed or + successful) such that all Trials before it are also finished. + """ + @abstractmethod def load( self, last_trial_id: int = -1, + omit_registered_trial_ids: Iterable[int] | None = None, ) -> tuple[list[int], list[dict], list[dict[str, Any] | None], list[Status]]: """ Load (tunable values, benchmark scores, status) to warm-up the optimizer. @@ -296,10 +327,20 @@ def load( that were scheduled *after* the given trial ID. Otherwise, return data from ALL merged-in experiments and attempt to impute the missing tunable values. + Additionally, if `omit_registered_trial_ids` is provided, omit the + trials matching those ids. + + The parameters together allow us to efficiently load data from + finished trials that we haven't registered with the Optimizer yet + for bulk registering. + Parameters ---------- last_trial_id : int (Optional) Trial ID to start from. + omit_registered_trial_ids : Iterable[int] | None = None, + (Optional) List of trial IDs to omit. If None, load all trials. + Returns ------- @@ -307,24 +348,52 @@ def load( Trial ids, Tunable values, benchmark scores, and status of the trials. """ + @abstractmethod + def get_trial_by_id( + self, + trial_id: int, + ) -> Storage.Trial | None: + """ + Gets a Trial by its ID. + + Parameters + ---------- + trial_id : int + ID of the Trial to retrieve for this Experiment. + + Returns + ------- + trial : Storage.Trial | None + The Trial object, or None if it doesn't exist. + """ + @abstractmethod def pending_trials( self, timestamp: datetime, *, running: bool, - ) -> Iterator["Storage.Trial"]: + trial_runner_assigned: bool | None = None, + ) -> Iterator[Storage.Trial]: """ - Return an iterator over the pending trials that are scheduled to run on or - before the specified timestamp. + Return an iterator over the :py:class:`~.Storage.Trial`s that are + :py:attr:`~.Status.PENDING` and have a scheduled + :py:attr:`~.Storage.Trial.ts_start` time to run on or before the specified + timestamp. Parameters ---------- timestamp : datetime.datetime - The time in UTC to check for scheduled trials. + The time in UTC to check for scheduled Trials. running : bool - If True, include the trials that are already running. + If True, include the Trials that are also + :py:attr:`~.Status.RUNNING` or :py:attr:`~.Status.READY`. Otherwise, return only the scheduled trials. + trial_runner_assigned : bool | None + If True, include the Trials that are assigned to a + :py:class:`~.TrialRunner`. If False, return only the trials + that are not assigned to any :py:class:`~.TrialRunner`. + If None, return all trials regardless of their assignment. Returns ------- @@ -337,7 +406,7 @@ def new_trial( tunables: TunableGroups, ts_start: datetime | None = None, config: dict[str, Any] | None = None, - ) -> "Storage.Trial": + ) -> Storage.Trial: """ Create a new experiment run in the storage. @@ -374,7 +443,7 @@ def _new_trial( tunables: TunableGroups, ts_start: datetime | None = None, config: dict[str, Any] | None = None, - ) -> "Storage.Trial": + ) -> Storage.Trial: """ Create a new experiment run in the storage. diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index eb47de7d714..dfe13d397fc 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -8,7 +8,7 @@ import hashlib import logging -from collections.abc import Iterator +from collections.abc import Iterable, Iterator from datetime import datetime from typing import Any, Literal @@ -153,13 +153,43 @@ def load_telemetry(self, trial_id: int) -> list[tuple[datetime, str, Any]]: for row in cur_telemetry.fetchall() ] + # TODO: Add a test for this method. + def get_longest_prefix_finished_trial_id(self) -> int: + with self._engine.connect() as conn: + # Get the first (minimum) trial ID with an unfinished status. + first_unfinished_trial_id_stmt = ( + self._schema.trial.select() + .with_only_columns( + func.min(self._schema.trial.c.trial_id), + ) + .where( + self._schema.trial.c.exp_id == self._experiment_id, + func.not_( + self._schema.trial.c.status.in_( + [ + Status.SUCCEEDED.name, + Status.FAILED.name, + Status.TIMED_OUT.name, + ] + ), + ), + ) + ) + + max_trial_id = conn.execute(first_unfinished_trial_id_stmt).scalar() + if max_trial_id is None: + return -1 + # Return one less than the first unfinished trial ID - it should be + # finished (or not exist, which is fine as a limit). + return int(max_trial_id) - 1 + def load( self, last_trial_id: int = -1, + omit_registered_trial_ids: Iterable[int] | None = None, ) -> tuple[list[int], list[dict], list[dict[str, Any] | None], list[Status]]: - with self._engine.connect() as conn: - cur_trials = conn.execute( + stmt = ( self._schema.trial.select() .with_only_columns( self._schema.trial.c.trial_id, @@ -182,6 +212,15 @@ def load( ) ) + # TODO: Add a test for this parameter. + + # Note: if we have a very large number of trials, this may encounter + # SQL text length limits, so we may need to chunk this. + if omit_registered_trial_ids is not None: + stmt = stmt.where(self._schema.trial.c.trial_id.notin_(omit_registered_trial_ids)) + + cur_trials = conn.execute(stmt) + trial_ids: list[int] = [] configs: list[dict[str, Any]] = [] scores: list[dict[str, Any] | None] = [] @@ -235,25 +274,75 @@ def _get_key_val(conn: Connection, table: Table, field: str, **kwargs: Any) -> d row._tuple() for row in cur_result.fetchall() # pylint: disable=protected-access ) - def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]: - timestamp = utcify_timestamp(timestamp, origin="local") - _LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp) - if running: - pending_status = [Status.PENDING.name, Status.READY.name, Status.RUNNING.name] - else: - pending_status = [Status.PENDING.name] + # TODO: Needs tests. + def get_trial_by_id( + self, + trial_id: int, + ) -> Storage.Trial | None: with self._engine.connect() as conn: - cur_trials = conn.execute( + trial = conn.execute( self._schema.trial.select().where( self._schema.trial.c.exp_id == self._experiment_id, - ( - self._schema.trial.c.ts_start.is_(None) - | (self._schema.trial.c.ts_start <= timestamp) - ), - self._schema.trial.c.ts_end.is_(None), - self._schema.trial.c.status.in_(pending_status), + self._schema.trial.c.trial_id == trial_id, ) + ).fetchone() + if trial is None: + return None + tunables = self._get_key_val( + conn, + self._schema.config_param, + "param", + config_id=trial.config_id, + ) + config = self._get_key_val( + conn, + self._schema.trial_param, + "param", + exp_id=self._experiment_id, + trial_id=trial_id, + ) + return Trial( + engine=self._engine, + schema=self._schema, + # Reset .is_updated flag after the assignment: + tunables=self._tunables.copy().assign(tunables).reset(), + experiment_id=self._experiment_id, + trial_id=trial_id, + config_id=trial.config_id, + trial_runner_id=trial.trial_runner_id, + opt_targets=self._opt_targets, + config=config, + ) + + def pending_trials( + self, + timestamp: datetime, + *, + running: bool = False, + trial_runner_assigned: bool | None = None, + ) -> Iterator[Storage.Trial]: + if running: + statuses = [Status.PENDING, Status.READY, Status.RUNNING] + else: + statuses = [Status.PENDING] + timestamp = utcify_timestamp(timestamp, origin="local") + _LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp) + with self._engine.connect() as conn: + stmt = self._schema.trial.select().where( + self._schema.trial.c.exp_id == self._experiment_id, + ( + self._schema.trial.c.ts_start.is_(None) + | (self._schema.trial.c.ts_start <= timestamp) + ), + self._schema.trial.c.ts_end.is_(None), + self._schema.trial.c.status.in_([s.name for s in statuses]), ) + if trial_runner_assigned: + stmt.where(self._schema.trial.c.trial_runner_id.isnot(None)) + elif trial_runner_assigned is False: + stmt.where(self._schema.trial.c.trial_runner_id.is_(None)) + # else: # No filtering by trial_runner_id + cur_trials = conn.execute(stmt) for trial in cur_trials.fetchall(): tunables = self._get_key_val( conn, diff --git a/mlos_bench/mlos_bench/storage/sql/storage.py b/mlos_bench/mlos_bench/storage/sql/storage.py index b3bf63d0edb..4d606d624e4 100644 --- a/mlos_bench/mlos_bench/storage/sql/storage.py +++ b/mlos_bench/mlos_bench/storage/sql/storage.py @@ -7,7 +7,7 @@ import logging from typing import Literal -from sqlalchemy import URL, create_engine +from sqlalchemy import URL, Engine, create_engine from mlos_bench.services.base_service import Service from mlos_bench.storage.base_experiment_data import ExperimentData @@ -32,21 +32,44 @@ def __init__( service: Service | None = None, ): super().__init__(config, global_config, service) - lazy_schema_create = self._config.pop("lazy_schema_create", False) + self._lazy_schema_create = self._config.pop("lazy_schema_create", False) self._log_sql = self._config.pop("log_sql", False) self._url = URL.create(**self._config) self._repr = f"{self._url.get_backend_name()}:{self._url.database}" + self._engine: Engine + self._db_schema: DbSchema + self._schema_created = False + self._schema_updated = False + self._init_engine() + + def _init_engine(self) -> None: + """Initialize the SQLAlchemy engine.""" + # This is a no-op, as the engine is created in __init__. _LOG.info("Connect to the database: %s", self) self._engine = create_engine(self._url, echo=self._log_sql) self._db_schema = DbSchema(self._engine) - self._schema_created = False - self._schema_updated = False - if not lazy_schema_create: + if not self._lazy_schema_create: assert self._schema self.update_schema() else: _LOG.info("Using lazy schema create for database: %s", self) + # Make the object picklable. + + def __getstate__(self) -> dict: + """Return the state of the object for pickling.""" + state = self.__dict__.copy() + # Don't pickle the engine, as it cannot be pickled. + state.pop("_engine", None) + state.pop("_db_schema", None) + return state + + def __setstate__(self, state: dict) -> None: + """Restore the state of the object from pickling.""" + self.__dict__.update(state) + # Recreate the engine and schema. + self._init_engine() + @property def _schema(self) -> DbSchema: """Lazily create schema upon first access.""" @@ -66,6 +89,33 @@ def update_schema(self) -> None: def __repr__(self) -> str: return self._repr + # TODO: Implement me: + # TODO: Needs tests. + def get_experiment_by_id(self, experiment_id): + with self._engine.connect() as conn: + # TODO: need to join with the trial table to get the max trial_id, + # objectives to get the opt_targets, and tunable_params to get the + # tunables. + cur_exp = conn.execute( + self._schema.experiment.select().where( + self._schema.experiment.c.exp_id == experiment_id, + ) + ) + exp = cur_exp.fetchone() + if exp is None: + return None + return Experiment( + engine=self._engine, + schema=self._schema, + experiment_id=exp.exp_id, + trial_id=-1, + tunables=TunableGroups(), + description=exp.description, + root_env_config=exp.root_env_config, + git_repo=exp.git_repo, + git_commit=exp.git_commit, + ) + def experiment( # pylint: disable=too-many-arguments self, *, diff --git a/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-bad-repeat.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-bad-repeat.jsonc new file mode 100644 index 00000000000..4ea6bdbf170 --- /dev/null +++ b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-bad-repeat.jsonc @@ -0,0 +1,6 @@ +{ + "class": "mlos_bench.schedulers.ParallelScheduler", + "config": { + "trial_config_repeat_count": 0 + } +} diff --git a/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-empty-config.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-empty-config.jsonc new file mode 100644 index 00000000000..06729a4f368 --- /dev/null +++ b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-empty-config.jsonc @@ -0,0 +1,5 @@ +{ + "class": "mlos_bench.schedulers.ParallelScheduler", + "config": { + } +} diff --git a/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/unhandled/parallel_sched-extra.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/unhandled/parallel_sched-extra.jsonc new file mode 100644 index 00000000000..68623ee611f --- /dev/null +++ b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/unhandled/parallel_sched-extra.jsonc @@ -0,0 +1,6 @@ +{ + "class": "mlos_bench.schedulers.parallel_scheduler.ParallelScheduler", + "config": { + "extra": "unsupported" + } +} diff --git a/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/full/parallel_sched-full.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/full/parallel_sched-full.jsonc new file mode 100644 index 00000000000..90bac645032 --- /dev/null +++ b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/full/parallel_sched-full.jsonc @@ -0,0 +1,12 @@ +{ + "$schema": "https://raw.githubusercontent.com/microsoft/MLOS/main/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json", + "class": "mlos_bench.schedulers.parallel_scheduler.ParallelScheduler", + "config": { + "trial_config_repeat_count": 3, + "teardown": false, + "experiment_id": "MyExperimentName", + "config_id": 1, + "trial_id": 1, + "max_trials": 100 + } +} diff --git a/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/partial/parallel_sched-partial.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/partial/parallel_sched-partial.jsonc new file mode 100644 index 00000000000..1b0e39c3305 --- /dev/null +++ b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/partial/parallel_sched-partial.jsonc @@ -0,0 +1,7 @@ +{ + "class": "mlos_bench.schedulers.ParallelScheduler", + "config": { + "trial_config_repeat_count": 3, + "teardown": false + } +} diff --git a/mlos_bench/mlos_bench/tests/launcher_parse_args_test.py b/mlos_bench/mlos_bench/tests/launcher_parse_args_test.py index 6294ee8bf3b..b84245732f5 100644 --- a/mlos_bench/mlos_bench/tests/launcher_parse_args_test.py +++ b/mlos_bench/mlos_bench/tests/launcher_parse_args_test.py @@ -18,7 +18,7 @@ from mlos_bench.launcher import Launcher from mlos_bench.optimizers import MlosCoreOptimizer, OneShotOptimizer from mlos_bench.os_environ import environ -from mlos_bench.schedulers import SyncScheduler +from mlos_bench.schedulers import ParallelScheduler, SyncScheduler from mlos_bench.services.types import ( SupportsAuth, SupportsConfigLoading, @@ -307,5 +307,67 @@ def test_launcher_args_parse_3(config_paths: list[str]) -> None: assert launcher.scheduler.trial_config_repeat_count == 2 +def test_launcher_args_parse_4(config_paths: list[str]) -> None: + """ + Test that using multiple --globals arguments works and that multiple space separated + options to --config-paths works. + + Check $var expansion and Environment loading. + """ + # Here we have multiple paths following --config-paths and --service. + cli_args = ( + "--config-paths " + + " ".join(config_paths) + + " --num-trial-runners 5" + + " --service services/remote/mock/mock_auth_service.jsonc" + " services/remote/mock/mock_remote_exec_service.jsonc" + " --scheduler schedulers/parallel_scheduler.jsonc" + f" --environment {ENV_CONF_PATH}" + " --globals globals/global_test_config.jsonc" + " --globals globals/global_test_extra_config.jsonc" + " --test_global_value_2 from-args" + ) + launcher = _get_launcher(__name__, cli_args) + # Check some additional features of the the parent service + assert isinstance(launcher.service, SupportsAuth) # from --service + assert isinstance(launcher.service, SupportsRemoteExec) # from --service + # Check that the first --globals file is loaded and $var expansion is handled. + assert launcher.global_config["experiment_id"] == "MockExperiment" + assert launcher.global_config["testVmName"] == "MockExperiment-vm" + # Check that secondary expansion also works. + assert launcher.global_config["testVnetName"] == "MockExperiment-vm-vnet" + # Check that the second --globals file is loaded. + assert launcher.global_config["test_global_value"] == "from-file" + # Check overriding values in a file from the command line. + assert launcher.global_config["test_global_value_2"] == "from-args" + # Check that we can expand a $var in a config file that references an environment variable. + assert path_join(launcher.global_config["pathVarWithEnvVarRef"], abs_path=True) == path_join( + os.getcwd(), "foo", abs_path=True + ) + assert launcher.global_config["varWithEnvVarRef"] == f"user:{getuser()}" + assert launcher.teardown + # Make sure we have the right number of trial runners. + assert len(launcher.trial_runners) == 5 # from cli args + # Check that the environment that got loaded looks to be of the right type. + env_config = launcher.config_loader.load_config(ENV_CONF_PATH, ConfigSchema.ENVIRONMENT) + assert env_config["class"] == "mlos_bench.environments.mock_env.MockEnv" + # All TrialRunners should get the same Environment. + assert all( + check_class_name(trial_runner.environment, env_config["class"]) + for trial_runner in launcher.trial_runners + ) + # Check that the optimizer looks right. + assert isinstance(launcher.optimizer, OneShotOptimizer) + # Check that the optimizer got initialized with defaults. + assert launcher.optimizer.tunable_params.is_defaults() + assert launcher.optimizer.max_suggestions == 1 # value for OneShotOptimizer + # Check that we pick up the right scheduler config: + assert isinstance(launcher.scheduler, ParallelScheduler) + assert ( + launcher.scheduler.trial_config_repeat_count == 3 + ) # from the custom sync_scheduler.jsonc config + assert launcher.scheduler.max_trials == -1 + + if __name__ == "__main__": pytest.main([__file__, "-n0"]) diff --git a/mlos_bench/mlos_bench/tests/storage/conftest.py b/mlos_bench/mlos_bench/tests/storage/conftest.py index a1437052823..212bf4acd4c 100644 --- a/mlos_bench/mlos_bench/tests/storage/conftest.py +++ b/mlos_bench/mlos_bench/tests/storage/conftest.py @@ -16,5 +16,7 @@ exp_no_tunables_storage = sql_storage_fixtures.exp_no_tunables_storage mixed_numerics_exp_storage = sql_storage_fixtures.mixed_numerics_exp_storage exp_data = sql_storage_fixtures.exp_data +parallel_exp_data = sql_storage_fixtures.parallel_exp_data + exp_no_tunables_data = sql_storage_fixtures.exp_no_tunables_data mixed_numerics_exp_data = sql_storage_fixtures.mixed_numerics_exp_data diff --git a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py index cb83bffd4ff..3cec974fcf5 100644 --- a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py +++ b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py @@ -4,12 +4,13 @@ # """Test fixtures for mlos_bench storage.""" -from collections.abc import Generator +from collections.abc import Callable, Generator from random import seed as rand_seed import pytest from mlos_bench.optimizers.mock_optimizer import MockOptimizer +from mlos_bench.schedulers.parallel_scheduler import ParallelScheduler from mlos_bench.schedulers.sync_scheduler import SyncScheduler from mlos_bench.schedulers.trial_runner import TrialRunner from mlos_bench.services.config_persistence import ConfigPersistenceService @@ -109,6 +110,94 @@ def mixed_numerics_exp_storage( assert not exp._in_context +def _parallel_dummy_run_exp( + storage: SqlStorage, + exp: SqlStorage.Experiment, +) -> ExperimentData: + """ + Generates data by doing a simulated run of the given experiment. + + Parameters + ---------- + storage : SqlStorage + The storage object to use. + exp : SqlStorage.Experiment + The experiment to "run". + Note: this particular object won't be updated, but a new one will be created + from its metadata. + + Returns + ------- + ExperimentData + The data generated by the simulated run. + """ + # pylint: disable=too-many-locals + + rand_seed(SEED) + + trial_runners: list[TrialRunner] = [] + global_config: dict = {} + config_loader = ConfigPersistenceService() + tunable_params = ",".join(f'"{name}"' for name in exp.tunables.get_covariant_group_names()) + mock_env_json = f""" + {{ + "class": "mlos_bench.environments.mock_env.MockEnv", + "name": "Test Env", + "config": {{ + "tunable_params": [{tunable_params}], + "mock_env_seed": {SEED}, + "mock_env_range": [60, 120], + "mock_env_metrics": ["score"] + }} + }} + """ + trial_runners = TrialRunner.create_from_json( + config_loader=config_loader, + global_config=global_config, + tunable_groups=exp.tunables, + env_json=mock_env_json, + svcs_json=None, + num_trial_runners=TRIAL_RUNNER_COUNT, + ) + + opt = MockOptimizer( + tunables=exp.tunables, + config={ + "optimization_targets": exp.opt_targets, + "seed": SEED, + # This should be the default, so we leave it omitted for now to test the default. + # But the test logic relies on this (e.g., trial 1 is config 1 is the + # default values for the tunable params) + # "start_with_defaults": True, + "max_suggestions": MAX_TRIALS, + }, + global_config=global_config, + ) + + scheduler = ParallelScheduler( + # All config values can be overridden from global config + config={ + "experiment_id": exp.experiment_id, + "trial_id": exp.trial_id, + "config_id": -1, + "trial_config_repeat_count": CONFIG_TRIAL_REPEAT_COUNT, + "max_trials": MAX_TRIALS, + }, + global_config=global_config, + trial_runners=trial_runners, + optimizer=opt, + storage=storage, + root_env_config=exp.root_env_config, + ) + + # Add some trial data to that experiment by "running" it. + with scheduler: + scheduler.start() + scheduler.teardown() + + return storage.experiments[exp.experiment_id] + + def _dummy_run_exp( storage: SqlStorage, exp: SqlStorage.Experiment, @@ -197,13 +286,49 @@ def _dummy_run_exp( return storage.experiments[exp.experiment_id] +def _exp_data( + storage: SqlStorage, + exp_storage: SqlStorage.Experiment, + run_exp: Callable[[SqlStorage, SqlStorage.Experiment], ExperimentData] = _dummy_run_exp, +) -> ExperimentData: + """Test fixture for ExperimentData.""" + return run_exp(storage, exp_storage) + + +def _exp_no_tunables_data( + storage: SqlStorage, + exp_no_tunables_storage: SqlStorage.Experiment, + run_exp: Callable[[SqlStorage, SqlStorage.Experiment], ExperimentData] = _dummy_run_exp, +) -> ExperimentData: + """Test fixture for ExperimentData with no tunable configs.""" + return run_exp(storage, exp_no_tunables_storage) + + +def _mixed_numerics_exp_data( + storage: SqlStorage, + mixed_numerics_exp_storage: SqlStorage.Experiment, + run_exp: Callable[[SqlStorage, SqlStorage.Experiment], ExperimentData] = _dummy_run_exp, +) -> ExperimentData: + """Test fixture for ExperimentData with mixed numerical tunable types.""" + return run_exp(storage, mixed_numerics_exp_storage) + + @pytest.fixture def exp_data( storage: SqlStorage, exp_storage: SqlStorage.Experiment, ) -> ExperimentData: """Test fixture for ExperimentData.""" - return _dummy_run_exp(storage, exp_storage) + return _exp_data(storage, exp_storage) + + +@pytest.fixture +def parallel_exp_data( + storage: SqlStorage, + exp_storage: SqlStorage.Experiment, +) -> ExperimentData: + """Test fixture for ExperimentData with parallel scheduling.""" + return _exp_data(storage, exp_storage, run_exp=_parallel_dummy_run_exp) @pytest.fixture @@ -212,7 +337,7 @@ def exp_no_tunables_data( exp_no_tunables_storage: SqlStorage.Experiment, ) -> ExperimentData: """Test fixture for ExperimentData with no tunable configs.""" - return _dummy_run_exp(storage, exp_no_tunables_storage) + return _exp_no_tunables_data(storage, exp_no_tunables_storage) @pytest.fixture @@ -221,4 +346,4 @@ def mixed_numerics_exp_data( mixed_numerics_exp_storage: SqlStorage.Experiment, ) -> ExperimentData: """Test fixture for ExperimentData with mixed numerical tunable types.""" - return _dummy_run_exp(storage, mixed_numerics_exp_storage) + return _mixed_numerics_exp_data(storage, mixed_numerics_exp_storage) diff --git a/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py b/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py index aaf545c787f..804f0ef9d00 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py @@ -3,14 +3,17 @@ # Licensed under the MIT License. # """Unit tests for scheduling trials for some future time.""" -from collections.abc import Iterator +from collections.abc import Callable, Iterator from datetime import datetime, timedelta +from typing import Any +import numpy as np from pytz import UTC from mlos_bench.environments.status import Status from mlos_bench.storage.base_experiment_data import ExperimentData from mlos_bench.storage.base_storage import Storage +from mlos_bench.storage.base_trial_data import TrialData from mlos_bench.tests.storage import ( CONFIG_COUNT, CONFIG_TRIAL_REPEAT_COUNT, @@ -173,3 +176,29 @@ def test_rr_scheduling(exp_data: ExperimentData) -> None: assert ( trial.trial_runner_id == expected_runner_id ), f"Expected trial_runner_id {expected_runner_id} for {trial}" + + +def test_parallel_scheduling(parallel_exp_data: ExperimentData) -> None: + """ + Checks that the scheduler schedules all of Trials across TrialRunners. + + Note that we can no longer assume the order of the trials, since they can complete + in any order. + """ + extractor: Callable[[Callable[[TrialData], Any]], list[Any]] = lambda fn: [ + fn(parallel_exp_data.trials[id]) + for id in range(1, CONFIG_COUNT * CONFIG_TRIAL_REPEAT_COUNT + 1) + ] + + trial_ids = extractor(lambda trial: trial.trial_id) + assert set(trial_ids) == set(range(1, CONFIG_COUNT * CONFIG_TRIAL_REPEAT_COUNT + 1)) + + config_ids = extractor(lambda trial: trial.tunable_config_id) + unique_config_ids, config_counts = np.unique(config_ids, return_counts=True) + assert len(unique_config_ids) == CONFIG_COUNT + assert all(count == CONFIG_TRIAL_REPEAT_COUNT for count in config_counts) + + repeat_nums = extractor(lambda trial: trial.metadata_dict["repeat_i"]) + unique_repeat_nums, repeat_nums_counts = np.unique(repeat_nums, return_counts=True) + assert len(unique_repeat_nums) == CONFIG_TRIAL_REPEAT_COUNT + assert all(count == CONFIG_COUNT for count in repeat_nums_counts)