diff --git a/.github/workflows/components-integration-tests.yaml b/.github/workflows/components-integration-tests.yaml index 03480aec8..480753351 100644 --- a/.github/workflows/components-integration-tests.yaml +++ b/.github/workflows/components-integration-tests.yaml @@ -24,8 +24,6 @@ jobs: platform: ubuntu-latest - scheduler: "local_docker" platform: linux.24_04.4x - - scheduler: "ray" - platform: ubuntu-latest fail-fast: false runs-on: ${{ matrix.platform }} permissions: diff --git a/.github/workflows/kfp-integration-tests.yaml b/.github/workflows/kfp-integration-tests.yaml deleted file mode 100644 index 4b075ebd4..000000000 --- a/.github/workflows/kfp-integration-tests.yaml +++ /dev/null @@ -1,34 +0,0 @@ -name: KFP Integration Tests - -on: - push: - branches: - - main - pull_request: - -jobs: - kfp-launch: - runs-on: inux.24_04.16x - steps: - - name: Setup Python - uses: actions/setup-python@v2 - with: - python-version: "3.10" - architecture: x64 - - name: Checkout TorchX - uses: actions/checkout@v2 - - name: Install dependencies - run: | - set -eux - pip install -r dev-requirements.txt - python setup.py install - - name: Start Kubernetes - run: | - scripts/setup_minikube.sh - scripts/setup_kfp.sh - - - name: Run KFP Integration Tests - env: - KFP_NAMESPACE: kubeflow - INTEGRATION_TEST_STORAGE: torchx_minio://torchx/tests - run: scripts/kfpint.py --container_repo localhost:5000/torchx diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 3caffe486..ca42445f1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -39,11 +39,6 @@ lintrunner init lintrunner -a ``` -## Integration Tests - -See the [KFP integration test](scripts/kfpint.py) file for more details on setup -and running them. - ## License By contributing to TorchX, you agree that your contributions will be licensed under the LICENSE file in the root directory of this source tree. diff --git a/README.md b/README.md index 1057bb370..cecac46de 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,6 @@ TorchX currently supports: * AWS Batch * Docker * Local -* Ray (prototype) * GCP Batch (prototype) Need a scheduler not listed? [Let us know!](https://github.com/pytorch/torchx/issues?q=is%3Aopen+is%3Aissue+label%3Ascheduler-request) @@ -55,15 +54,9 @@ pip install torchx # install torchx sdk and CLI -- all dependencies pip install "torchx[dev]" -# install torchx kubeflow pipelines (kfp) support -pip install "torchx[kfp]" - # install torchx Kubernetes / Volcano support pip install "torchx[kubernetes]" -# install torchx Ray support -pip install "torchx[ray]" - # install torchx GCP Batch support pip install "torchx[gcp_batch]" ``` diff --git a/dev-requirements.txt b/dev-requirements.txt index 0abe53c1d..05d8bf3d6 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -12,9 +12,6 @@ google-cloud-logging==3.10.0 google-cloud-runtimeconfig==0.34.0 hydra-core ipython -kfp==1.8.22 -# pin protobuf to the version that is required by kfp -protobuf==3.20.3 mlflow-skinny moto~=5.0.8 pyre-extensions @@ -32,7 +29,6 @@ torchtext==0.18.0 torchvision==0.23.0 typing-extensions ts==0.5.1 -ray[default] wheel # lint (linter versions are managed by lintrunner) @@ -45,4 +41,3 @@ grpcio==1.62.1 grpcio-status==1.48.1 googleapis-common-protos==1.63.0 google-api-core==2.18.0 -protobuf==3.20.3 # kfp==1.8.22 needs protobuf < 4 diff --git a/docs/source/basics.rst b/docs/source/basics.rst index 2673839e7..34d01d7c8 100644 --- a/docs/source/basics.rst +++ b/docs/source/basics.rst @@ -14,8 +14,7 @@ The top level modules in TorchX are: 4. :mod:`torchx.cli`: CLI tool 5. :mod:`torchx.runner`: given an app spec, submits the app as a job on a scheduler 6. :mod:`torchx.schedulers`: backend job schedulers that the runner supports -7. :mod:`torchx.pipelines`: adapters that convert the given app spec to a "stage" in an ML pipeline platform -8. :mod:`torchx.runtime`: util and abstraction libraries you can use in authoring apps (not app spec) +7. :mod:`torchx.runtime`: util and abstraction libraries you can use in authoring apps (not app spec) Below is a UML diagram @@ -32,8 +31,7 @@ the actual application. In scheduler lingo, this is a ``JobDefinition`` and a similar concept in Kubernetes is the ``spec.yaml``. To disambiguate between the application binary (logic) and the spec, we typically refer to a TorchX ``AppDef`` as an "app spec" or ``specs.AppDef``. It -is the common interface understood by ``torchx.runner`` -and ``torchx.pipelines`` allowing you to run your app as a standalone job +is the common interface understood by ``torchx.runner`` allowing you to run your app as a standalone job or as a stage in an ML pipeline. Below is a simple example of an ``specs.AppDef`` that echos "hello world" @@ -119,10 +117,6 @@ can be achieved through python function composition rather than object compositi However **we do not recommend component composition** for maintainability purposes. -**PROTIP 2:** To define dependencies between components, use a pipelining DSL. -See :ref:`basics:Pipeline Adapters` section below to understand how TorchX components -are used in the context of pipelines. - Before authoring your own component, browse through the library of :ref:`Components` that are included with TorchX to see if one fits your needs. @@ -141,34 +135,11 @@ There are two ways to access runners in TorchX: See :ref:`Schedulers` for a list of schedulers that the runner can launch apps to. -Pipeline Adapters -~~~~~~~~~~~~~~~~~~~~~~ -While runners launch components as standalone jobs, ``torchx.pipelines`` -makes it possible to plug components into an ML pipeline/workflow. For a -specific target pipeline platform (e.g. kubeflow pipelines), TorchX -defines an adapter that converts a TorchX app spec to whatever the -"stage" representation is in the target platform. For instance, -``torchx.pipelines.kfp`` adapter for kubeflow pipelines converts an -app spec to a ``kfp.ContainerOp`` (or more accurately, a kfp "component spec" yaml). - - -In most cases an app spec would map to a "stage" (or node) in a pipeline. -However advanced components, especially those that have a mini control flow -of its own (e.g. HPO), may map to a "sub-pipeline" or an "inline-pipeline". -The exact semantics of how these advanced components map to the pipeline -is dependent on the target pipeline platform. For example, if the -pipeline DSL allows dynamically adding stages to a pipeline from an upstream -stage, then TorchX may take advantage of such feature to "inline" the -sub-pipeline to the main pipeline. TorchX generally tries its best to adapt -app specs to the **most canonical** representation in the target pipeline platform. - -See :ref:`Pipelines` for a list of supported pipeline platforms. - Runtime ~~~~~~~~ .. important:: ``torchx.runtime`` is by no means is a requirement to use TorchX. If your infrastructure is fixed and you don't need your application - to be portable across different types of schedulers and pipelines, + to be portable across different types of schedulers, you can skip this section. Your application (not the app spec, but the actual app binary) has **ZERO** dependencies diff --git a/docs/source/conf.py b/docs/source/conf.py index 33e4e6fbd..0d56bb678 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -341,7 +341,7 @@ def handle_item(fieldarg, content): code_url = f"https://github.com/pytorch/torchx/archive/refs/heads/{notebook_version}.tar.gz" first_notebook_cell = f""" -!pip install torchx[kfp] +!pip install torchx !wget --no-clobber {code_url} !tar xf {notebook_version}.tar.gz --strip-components=1 @@ -351,7 +351,6 @@ def handle_item(fieldarg, content): sphinx_gallery_conf = { "examples_dirs": [ "../../torchx/examples/apps", - "../../torchx/examples/pipelines", ], "gallery_dirs": [ "examples_apps", diff --git a/docs/source/index.rst b/docs/source/index.rst index 484f92f83..efed61fa6 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -4,8 +4,6 @@ TorchX ================== TorchX is a universal job launcher for PyTorch applications. -TorchX is designed to have fast iteration time for training/research and support -for E2E production ML pipelines when you're ready. **GETTING STARTED?** Follow the :ref:`quickstart guide`. @@ -76,7 +74,6 @@ Works With schedulers/kubernetes schedulers/kubernetes_mcad schedulers/slurm - schedulers/ray schedulers/aws_batch schedulers/aws_sagemaker schedulers/lsf @@ -91,14 +88,6 @@ Works With schedulers/fb/* -.. _Pipelines: -.. toctree:: - :maxdepth: 1 - :caption: Pipelines - - pipelines/kfp - pipelines/airflow.md - .. fbcode:: .. toctree:: @@ -116,7 +105,6 @@ Examples :caption: Examples examples_apps/index - examples_pipelines/index Components Library @@ -165,7 +153,6 @@ Reference runner schedulers workspace - pipelines .. toctree:: :maxdepth: 1 diff --git a/docs/source/pipelines.rst b/docs/source/pipelines.rst deleted file mode 100644 index 5569a2d37..000000000 --- a/docs/source/pipelines.rst +++ /dev/null @@ -1,15 +0,0 @@ -torchx.pipelines -================ - -.. automodule:: torchx.pipelines -.. currentmodule:: torchx.pipelines - -All Pipelines -~~~~~~~~~~~~~~~~ - -.. toctree:: - :maxdepth: 1 - :glob: - - pipelines/* - diff --git a/docs/source/pipelines/airflow.md b/docs/source/pipelines/airflow.md deleted file mode 100644 index a0861d0b5..000000000 --- a/docs/source/pipelines/airflow.md +++ /dev/null @@ -1,104 +0,0 @@ ---- -jupyter: - jupytext: - text_representation: - extension: .md - format_name: markdown - format_version: '1.3' - jupytext_version: 1.13.7 - kernelspec: - display_name: Python 3 - language: python - name: python3 ---- - -# Airflow - -For pipelines that support Python based execution you can directly use the -TorchX API. TorchX is designed to be easily integrated in to other applications -via the programmatic API. No special Airflow integrations are needed. - -With TorchX, you can use Airflow for the pipeline orchestration and run your -PyTorch application (i.e. distributed training) on a remote GPU cluster. - -```python -import datetime -import pendulum - -from airflow.utils.state import DagRunState, TaskInstanceState -from airflow.utils.types import DagRunType -from airflow.models.dag import DAG -from airflow.decorators import task - - -DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC") -DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1) -``` - -To launch a TorchX job from Airflow you can create a Airflow Python task to -import the runner, launch the job and wait for it to complete. If you're running -on a remote cluster you may need to use the virtualenv task to install the -`torchx` package. - -```python -@task(task_id=f'hello_torchx') -def run_torchx(message): - """This is a function that will run within the DAG execution""" - from torchx.runner import get_runner - with get_runner() as runner: - # Run the utils.sh component on the local_cwd scheduler. - app_id = runner.run_component( - "utils.sh", - ["echo", message], - scheduler="local_cwd", - ) - - # Wait for the the job to complete - status = runner.wait(app_id, wait_interval=1) - - # Raise_for_status will raise an exception if the job didn't succeed - status.raise_for_status() - - # Finally we can print all of the log lines from the TorchX job so it - # will show up in the workflow logs. - for line in runner.log_lines(app_id, "sh", k=0): - print(line, end="") -``` - -Once we have the task defined we can put it into a Airflow DAG and run it like -normal. - -```python -from torchx.schedulers.ids import make_unique - -with DAG( - dag_id=make_unique('example_python_operator'), - schedule_interval=None, - start_date=DATA_INTERVAL_START, - catchup=False, - tags=['example'], -) as dag: - run_job = run_torchx("Hello, TorchX!") - - -dagrun = dag.create_dagrun( - state=DagRunState.RUNNING, - execution_date=DATA_INTERVAL_START, - data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END), - start_date=DATA_INTERVAL_END, - run_type=DagRunType.MANUAL, -) -ti = dagrun.get_task_instance(task_id="hello_torchx") -ti.task = dag.get_task(task_id="hello_torchx") -ti.run(ignore_ti_state=True) -assert ti.state == TaskInstanceState.SUCCESS -``` - -If all goes well you should see `Hello, TorchX!` printed above. - -## Next Steps - -* Checkout the [runner API documentation](../runner.rst) to learn more about - programmatic usage of TorchX -* Browse through the collection of [builtin components](../components/overview.rst) - which can be used in your Airflow pipeline diff --git a/docs/source/pipelines/kfp.rst b/docs/source/pipelines/kfp.rst deleted file mode 100644 index cabb04493..000000000 --- a/docs/source/pipelines/kfp.rst +++ /dev/null @@ -1,24 +0,0 @@ -Kubeflow Pipelines -====================== - -TorchX provides an adapter to run TorchX components as part of Kubeflow -Pipelines. See :ref:`examples_pipelines/index:KubeFlow Pipelines Examples`. - -.. image:: kfp_diagram.jpg - -torchx.pipelines.kfp -##################### - -.. image:: pipeline_kfp_diagram.png - -.. automodule:: torchx.pipelines.kfp -.. currentmodule:: torchx.pipelines.kfp - -.. currentmodule:: torchx.pipelines.kfp.adapter - -.. autofunction:: container_from_app -.. autofunction:: resource_from_app -.. autofunction:: component_from_app -.. autofunction:: component_spec_from_app - -.. autoclass:: ContainerFactory diff --git a/docs/source/pipelines/kfp_diagram.jpg b/docs/source/pipelines/kfp_diagram.jpg deleted file mode 100644 index 1fea3f1f0..000000000 Binary files a/docs/source/pipelines/kfp_diagram.jpg and /dev/null differ diff --git a/docs/source/pipelines/pipeline_kfp_diagram.png b/docs/source/pipelines/pipeline_kfp_diagram.png deleted file mode 100644 index e69de29bb..000000000 diff --git a/docs/source/quickstart.md b/docs/source/quickstart.md index c3aeae2e1..91bf5421d 100644 --- a/docs/source/quickstart.md +++ b/docs/source/quickstart.md @@ -174,7 +174,6 @@ run command for local works out of the box on remote. $ torchx run --scheduler slurm dist.ddp -j 2x2 --script dist_app.py $ torchx run --scheduler kubernetes dist.ddp -j 2x2 --script dist_app.py $ torchx run --scheduler aws_batch dist.ddp -j 2x2 --script dist_app.py -$ torchx run --scheduler ray dist.ddp -j 2x2 --script dist_app.py ``` @@ -236,5 +235,4 @@ The `slurm` and `local_cwd` use the current environment so you can use `pip` and 1. Checkout other features of the [torchx CLI](cli.rst) 2. Take a look at the [list of schedulers](schedulers.rst) supported by the runner 3. Browse through the collection of [builtin components](components/overview.rst) -4. See which [ML pipeline platforms](pipelines.rst) you can run components on 5. See a [training app example](examples_apps/index.rst) diff --git a/docs/source/schedulers/ray.rst b/docs/source/schedulers/ray.rst deleted file mode 100644 index b961821f9..000000000 --- a/docs/source/schedulers/ray.rst +++ /dev/null @@ -1,19 +0,0 @@ -Ray -================= - -.. fbcode:: - :exclude: - - .. automodule:: torchx.schedulers.ray_scheduler - - .. currentmodule:: torchx.schedulers.ray_scheduler - - .. autoclass:: RayScheduler - :members: - :show-inheritance: - - .. autofunction:: create_scheduler - .. autofunction:: serialize - - .. autoclass:: RayJob - :members: diff --git a/scripts/kfpint.py b/scripts/kfpint.py deleted file mode 100755 index dd9cc6eac..000000000 --- a/scripts/kfpint.py +++ /dev/null @@ -1,282 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -""" -This file runs the KFP integration tests on KFP cluster. There's a number of -environment variables that need to be setup as well as the cluster. - -See examples/pipelines/kfp/ for more information on how the cluster is used. - -Cluster setup: - -You'll need a KubeFlow Pipelines cluster with a torchserve instance with svc -name torchserve on the default namespace. - -* https://www.kubeflow.org/docs/started/installing-kubeflow/ -* https://github.com/pytorch/serve/blob/master/kubernetes/README.md - -Environment variables: - -``` -export KFP_NAMESPACE= -export INTEGRATION_TEST_STORAGE= -export TORCHX_CONTAINER_REPO= -``` - -Once you have everything setup you can just run: - -scripts/kfpint.py - - -""" - -import argparse -import asyncio -import dataclasses -import json -import os -import os.path -import shutil -import subprocess -import tempfile -import time -from contextlib import contextmanager -from typing import Any, Callable, Iterator, Optional, TypeVar - -import kfp - -from integ_test_utils import ( - build_images, - BuildInfo, - getenv_asserts, - MissingEnvError, - push_images, - run, - run_in_bg, -) -from torchx.util.types import none_throws -from urllib3.exceptions import MaxRetryError - -T = TypeVar("T") - - -def get_fn_name(fn: Callable[..., T]) -> str: - if hasattr(fn, "__qualname__"): - return fn.__qualname__ - elif hasattr(fn, "__name__"): - return fn.__name__ - else: - return "undefined" - - -def retry(f: Callable[..., T]) -> Callable[..., T]: - retries: int = 5 - backoff: int = 3 - - def wrapper(*args: Any, **kwargs: Any) -> T: - curr_retries = 0 - while True: - try: - return f(*args, **kwargs) - except: # noqa: B001 E722 - if curr_retries == retries: - raise - else: - sleep = backoff * 2**curr_retries - fn_name = get_fn_name(f) - print(f"retrying `{fn_name}` request after {sleep} seconds") - time.sleep(sleep) - curr_retries += 1 - continue - - return wrapper - - -@retry -def get_client(host: str) -> kfp.Client: - return kfp.Client(host=f"{host}/pipeline") - - -def get_free_port() -> int: - return 32001 - - -def enable_port_forward(local_port: int) -> "Optional[subprocess.Popen[str]]": - # Enable port forward via running background process. - # Kubernetes python does not support a clean way of - # Kubernetes python cli provides a socket, more info: - # https://github.com/kubernetes-client/python/blob/master/examples/pod_portforward.py - # The drawback of this method is that we have to monkey patch - # the urllib, which is used by the kfp client. - # This approach is more cleaner than to use the python cli directly. - try: - namespace = getenv_asserts("KFP_NAMESPACE") - except MissingEnvError: - print("Skipping port forward due to workflow executed without env variable") - return None - return run_in_bg( - "kubectl", - "port-forward", - "-n", - namespace, - "svc/ml-pipeline-ui", - f"{local_port}:80", - ) - - -META_FILE = "meta" -IMAGES_FILE = "images.tar.zst" - - -def save_advanced_pipeline_spec(path: str, build: BuildInfo) -> None: - print("generating advanced_pipeline spec") - - id = build.id - torchx_image = build.torchx_image - - STORAGE_PATH = os.getenv("INTEGRATION_TEST_STORAGE", "/tmp/storage") - root = os.path.join(STORAGE_PATH, id) - output = os.path.join(root, "output") - - save_pipeline_spec( - path, - "advanced_pipeline.py", - "--output_path", - output, - "--image", - torchx_image, - "--model_name", - f"tiny_image_net_{id}", - ) - - -def save_pipeline_spec(path: str, pipeline_file: str, *args: str) -> None: - print(f"generating pipeline spec for {pipeline_file}") - - with tempfile.TemporaryDirectory() as tmpdir: - run(os.path.join("torchx/examples/pipelines/kfp", pipeline_file), *args) - shutil.copy("pipeline.yaml", path) - - -@contextmanager -def path_or_tmp(path: Optional[str]) -> Iterator[str]: - if path: - os.makedirs(path, exist_ok=True) - yield path - else: - with tempfile.TemporaryDirectory() as tmpdir: - yield tmpdir - - -def _connection_error_message() -> str: - kfp_host = getenv_asserts("KFP_HOST") - return f""" - Unable to connect to kfp cluster using {kfp_host}. - Check that `kubectl` has proper credentials to execute port forward - """ - - -def save_build(path: str, build: BuildInfo) -> None: - meta_path = os.path.join(path, META_FILE) - with open(meta_path, "wt") as f: - json.dump(dataclasses.asdict(build), f) - - -def run_pipeline(build: BuildInfo, pipeline_file: str) -> object: - print(f"launching pipeline {pipeline_file}") - HOST: str = getenv_asserts("KFP_HOST") - - try: - client = get_client(HOST) - except MaxRetryError: - print(_connection_error_message()) - raise - resp = client.create_run_from_pipeline_package( - pipeline_file, - arguments={}, - experiment_name="integration-tests", - run_name=f"integration test {build.id} - {os.path.basename(pipeline_file)}", - ) - ui_url = f"{HOST}/#/runs/details/{resp.run_id}" - print(f"{resp.run_id} - launched! view run at {ui_url}") - return resp - - -def wait_for_pipeline( - resp: Any, # pyre-fixme: KFP doesn't have a response type -) -> None: - print(f"{resp.run_id} - waiting for completion") - result = resp.wait_for_run_completion( - timeout=1 * 60 * 60, - ) # 1 hour - print(f"{resp.run_id} - finished: {result}") - assert result.run.status == "Succeeded", "run didn't succeed" - - -async def exec_job() -> None: - parser = argparse.ArgumentParser(description="kfp integration test runner") - parser.add_argument( - "--path", - type=str, - help="path to place the files", - ) - parser.add_argument( - "--load", - help="if specified load the build from path instead of building", - action="store_true", - ) - parser.add_argument( - "--save", - help="if specified save the build to path and exit", - action="store_true", - ) - parser.add_argument("--container_repo", type=str) - args = parser.parse_args() - - with path_or_tmp(args.path) as path: - advanced_pipeline_file = os.path.join(path, "advanced_pipeline.yaml") - intro_pipeline_file = os.path.join(path, "intro_pipeline.yaml") - dist_pipeline_file = os.path.join(path, "dist_pipeline.yaml") - build = build_images() - try: - push_images(build, container_repo=args.container_repo) - except MissingEnvError as e: - print(f"Missing environments, only building: {e}") - return - finally: - save_advanced_pipeline_spec(advanced_pipeline_file, build) - save_pipeline_spec(intro_pipeline_file, "intro_pipeline.py") - save_pipeline_spec(dist_pipeline_file, "dist_pipeline.py") - - pipeline_files = [ - advanced_pipeline_file, - intro_pipeline_file, - dist_pipeline_file, - ] - responses = [ - run_pipeline(build, pipeline_file) for pipeline_file in pipeline_files - ] - for response in responses: - wait_for_pipeline(response) - - -def main() -> None: - port = get_free_port() - kfp_host = f"http://localhost:{port}" - os.environ["KFP_HOST"] = kfp_host - port_forward_proc = enable_port_forward(port) - try: - asyncio.run(exec_job()) - finally: - if port_forward_proc: - none_throws(port_forward_proc).kill() - - -if __name__ == "__main__": - main() diff --git a/scripts/setup_kfp.sh b/scripts/setup_kfp.sh deleted file mode 100755 index dd2c42ba4..000000000 --- a/scripts/setup_kfp.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/bash - -set -eux - -export PIPELINE_VERSION=1.8.5 -kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION" -kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io -kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref=$PIPELINE_VERSION" -kubectl apply -f resources/kfp_volcano_role_binding.yaml -kubectl wait --namespace=kubeflow --for condition=available --timeout=10m deployments/metadata-grpc-deployment diff --git a/setup.py b/setup.py index 16981d5dc..4affc1d25 100644 --- a/setup.py +++ b/setup.py @@ -87,9 +87,7 @@ def get_nightly_version(): "google-cloud-logging>=3.0.0", "google-cloud-runtimeconfig>=0.33.2", ], - "kfp": ["kfp==1.6.2"], "kubernetes": ["kubernetes>=11"], - "ray": ["ray>=1.12.1"], "dev": dev_reqs, }, # PyPI package information. diff --git a/torchx/components/__init__.py b/torchx/components/__init__.py index a215fc4ce..428ad9aee 100644 --- a/torchx/components/__init__.py +++ b/torchx/components/__init__.py @@ -298,13 +298,6 @@ def f(i: int, f: float, s: str, b: bool, l: List[str], d: Dict[str, str], *args) * ``*args=["--help"]``: ``torchx run comp.py:f -- --help`` * ``*args=["--i", "2"]``: ``torchx run comp.py:f --i 1 -- --i 2`` -Run in a Pipeline --------------------------------- - -The :ref:`torchx.pipelines` define adapters that -convert a torchx component into the object that represents a pipeline "stage" in the -target pipeline platform (see :ref:`Pipelines` for a list of supported pipeline orchestrators). - Additional Resources ----------------------- diff --git a/torchx/examples/apps/aws/ray/ray_cluster.yaml b/torchx/examples/apps/aws/ray/ray_cluster.yaml deleted file mode 100644 index 3b0eb6507..000000000 --- a/torchx/examples/apps/aws/ray/ray_cluster.yaml +++ /dev/null @@ -1,142 +0,0 @@ -# An unique identifier for the head node and workers of this cluster. -cluster_name: gpu-docker - -min_workers: 1 -max_workers: 4 - -# The autoscaler will scale up the cluster faster with higher upscaling speed. -# E.g., if the task requires adding more nodes then autoscaler will gradually -# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. -# This number should be > 0. -upscaling_speed: 1.0 - -# This executes all commands on all nodes in the docker container, -# and opens all the necessary ports to support the Ray cluster. -# Empty string means disabled. -docker: - image: "rayproject/ray-ml:latest-gpu" - # image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull - container_name: "ray_nvidia_docker" # e.g. ray_docker - - -# If a node is idle for this many minutes, it will be removed. -idle_timeout_minutes: 5 - -# Cloud-provider specific configuration. -provider: - type: aws - region: us-west-2 - # Availability zone(s), comma-separated, that nodes may be launched in. - # Nodes are currently spread between zones by a round-robin approach, - # however this implementation detail should not be relied upon. - availability_zone: us-west-2a,us-west-2b - security_group: - GroupName: dashboard_group - IpPermissions: - - FromPort: 20002 - ToPort: 20002 - IpProtocol: TCP - IpRanges: - - CidrIp: 0.0.0.0/0 - - -# How Ray will authenticate with newly launched nodes. -auth: - ssh_user: ubuntu -# By default Ray creates a new private keypair, but you can also use your own. -# If you do so, make sure to also set "KeyName" in the head and worker node -# configurations below. -# ssh_private_key: /path/to/your/key.pem - -# Tell the autoscaler the allowed node types and the resources they provide. -# The key is the name of the node type, which is just for debugging purposes. -# The node config specifies the launch config and physical instance type. -available_node_types: - # CPU head node. - ray.head.cpu: - # worker_image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull - # The node type's CPU and GPU resources are auto-detected based on AWS instance type. - # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler. - # You can also set custom resources. - # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set - # resources: {"CPU": 1, "GPU": 1, "custom": 5} - resources: {} - # Provider-specific config for this node type, e.g. instance type. By default - # Ray will auto-configure unspecified fields such as SubnetId and KeyName. - # For more documentation on available fields, see: - # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances - node_config: - InstanceType: m5.large - ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30 - # You can provision additional disk space with a conf as follows - BlockDeviceMappings: - - DeviceName: /dev/sda1 - Ebs: - VolumeSize: 200 - # Additional options in the boto docs. - # CPU workers. - ray.worker.default: - # Override global docker setting. - # This node type will run a CPU image, - # rather than the GPU image specified in the global docker settings. - docker: - worker_image: "rayproject/ray-ml:latest-cpu" - # The minimum number of nodes of this type to launch. - # This number should be >= 0. - min_workers: 1 - # The maximum number of workers nodes of this type to launch. - # This takes precedence over min_workers. - max_workers: 2 - # The node type's CPU and GPU resources are auto-detected based on AWS instance type. - # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler. - # You can also set custom resources. - # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set - # resources: {"CPU": 1, "GPU": 1, "custom": 5} - resources: {} - # Provider-specific config for this node type, e.g. instance type. By default - # Ray will auto-configure unspecified fields such as SubnetId and KeyName. - # For more documentation on available fields, see: - # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances - node_config: - InstanceType: m5.large - ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30 - # Run workers on spot by default. Comment this out to use on-demand. - InstanceMarketOptions: - MarketType: spot - # Additional options can be found in the boto docs, e.g. - # SpotOptions: - # MaxPrice: MAX_HOURLY_PRICE - # Additional options in the boto docs. - -# Specify the node type of the head node (as configured above). -head_node_type: ray.head.cpu - -# Files or directories to copy to the head and worker nodes. The format is a -# dictionary from REMOTE_PATH: LOCAL_PATH, e.g. -file_mounts: { - # "/path1/on/remote/machine": "/path1/on/local/machine", - # "/path2/on/remote/machine": "/path2/on/local/machine", -} - -# List of shell commands to run to set up nodes. -# NOTE: rayproject/ray:latest has ray latest bundled -setup_commands: [] -# - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp36-cp36m-manylinux2014_x86_64.whl -# - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl" - -# Custom commands that will be run on the head node after common setup. -head_setup_commands: - - pip install boto3==1.4.8 # 1.4.8 adds InstanceMarketOptions - -# Custom commands that will be run on worker nodes after common setup. -worker_setup_commands: [] - -# Command to start ray on the head node. You don't need to change this. -head_start_ray_commands: - - ray stop - - ulimit -n 65536; ray start --dashboard-port 20002 --dashboard-host=0.0.0.0 --include-dashboard True --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml - -# Command to start ray on worker nodes. You don't need to change this. -worker_start_ray_commands: - - ray stop - - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 diff --git a/torchx/examples/pipelines/README.rst b/torchx/examples/pipelines/README.rst deleted file mode 100644 index 59f79deca..000000000 --- a/torchx/examples/pipelines/README.rst +++ /dev/null @@ -1,4 +0,0 @@ -Pipelines Examples -================== - -This contains examples of using TorchX components as part of pipelines. diff --git a/torchx/examples/pipelines/__init__.py b/torchx/examples/pipelines/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/torchx/examples/pipelines/kfp/README.rst b/torchx/examples/pipelines/kfp/README.rst deleted file mode 100644 index 26de2d7e0..000000000 --- a/torchx/examples/pipelines/kfp/README.rst +++ /dev/null @@ -1,6 +0,0 @@ -KubeFlow Pipelines Examples -########################### - -Each of these files is a python file that generates a Kubeflow Pipeline -definition that uses TorchX components. The generated ``pipeline.yaml`` files -can be uploaded to a KFP cluster to run the pipeline. diff --git a/torchx/examples/pipelines/kfp/__init__.py b/torchx/examples/pipelines/kfp/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/torchx/examples/pipelines/kfp/advanced_pipeline.py b/torchx/examples/pipelines/kfp/advanced_pipeline.py deleted file mode 100755 index e84e6ee52..000000000 --- a/torchx/examples/pipelines/kfp/advanced_pipeline.py +++ /dev/null @@ -1,289 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -""" -Advanced KubeFlow Pipelines Example -=================================== - -This is an example pipeline using KubeFlow Pipelines built with only TorchX -components. - -KFP adapters can be used transform the TorchX components directly into -something that can be used within KFP. -""" - -# %% -# Input Arguments -# ############### -# Lets first define some arguments for the pipeline. - -import argparse -import os.path -import sys -from typing import Dict - -import kfp -import torchx -from torchx import specs -from torchx.components.dist import ddp as dist_ddp -from torchx.components.serve import torchserve -from torchx.components.utils import copy as utils_copy, python as utils_python -from torchx.pipelines.kfp.adapter import container_from_app - - -parser = argparse.ArgumentParser(description="example kfp pipeline") - -# %% -# TorchX components are built around images. Depending on what scheduler -# you're using this can vary but for KFP these images are specified as -# docker containers. We have one container for the example apps and one for -# the standard built in apps. If you modify the torchx example code you'll -# need to rebuild the container before launching it on KFP - - -parser.add_argument( - "--image", - type=str, - help="docker image to use for the examples apps", - default=torchx.IMAGE, -) - -# %% -# Most TorchX components use -# `fsspec `_ to abstract -# away dealing with remote filesystems. This allows the components to take -# paths like ``s3://`` to make it easy to use cloud storage providers. -parser.add_argument( - "--output_path", - type=str, - help="path to place the data", - required=True, -) -parser.add_argument("--load_path", type=str, help="checkpoint path to load from") - -# %% -# This example uses the torchserve for inference so we need to specify some -# options. This assumes you have a TorchServe instance running in the same -# Kubernetes cluster with with the service name ``torchserve`` in the default -# namespace. -# -# See https://github.com/pytorch/serve/blob/master/kubernetes/README.md for info -# on how to setup TorchServe. -parser.add_argument( - "--management_api", - type=str, - help="path to the torchserve management API", - default="http://torchserve.default.svc.cluster.local:8081", -) -parser.add_argument( - "--model_name", - type=str, - help="the name of the inference model", - default="tiny_image_net", -) - -# %% Parse the arguments, you'll need to set these accordingly if running from a -# notebook. - - -if "NOTEBOOK" in globals(): - argv = [ - "--output_path", - "/tmp/output", - ] -else: - argv = sys.argv[1:] - -args: argparse.Namespace = parser.parse_args(argv) - -# %% -# Creating the Components -# ####################### -# The first step is downloading the data to somewhere we can work on it. For -# this we can just the builtin copy component. This component takes two valid -# fsspec paths and copies them from one to another. In this case we're using -# http as the source and a file under the output_path as the output. - - -data_path: str = os.path.join(args.output_path, "tiny-imagenet-200.zip") -copy_app: specs.AppDef = utils_copy( - "http://cs231n.stanford.edu/tiny-imagenet-200.zip", - data_path, - image=args.image, -) - -# %% -# The next component is for data preprocessing. This takes in the raw data from -# the previous operator and runs some transforms on it for use with the trainer. -# -# datapreproc outputs the data to a specified fsspec path. These paths are all -# specified ahead of time so we have a fully static pipeline. - - -processed_data_path: str = os.path.join(args.output_path, "processed") -datapreproc_app: specs.AppDef = utils_python( - "--output_path", - processed_data_path, - "--input_path", - data_path, - "--limit", - "100", - image=args.image, - m="torchx.examples.apps.datapreproc.datapreproc", - cpu=1, - memMB=1024, -) - -# %% -# Next we'll create the trainer component that takes in the training data from the -# previous datapreproc component. We've defined this in a separate component -# file as you normally would. -# -# Having a separate component file allows you to launch your trainer from the -# TorchX CLI via ``torchx run`` for fast iteration as well as run it from a -# pipeline in an automated fashion. - -# make sure examples is on the path -if "__file__" in globals(): - sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..", "..")) - - -logs_path: str = os.path.join(args.output_path, "logs") -models_path: str = os.path.join(args.output_path, "models") - -trainer_app: specs.AppDef = dist_ddp( - *( - "--output_path", - models_path, - "--load_path", - args.load_path or "", - "--log_path", - logs_path, - "--data_path", - processed_data_path, - "--epochs", - str(1), - ), - image=args.image, - m="torchx.examples.apps.lightning.train", - j="1x1", - # per node resource settings - cpu=1, - memMB=3000, -) - -# %% -# To have the tensorboard path show up in KFPs UI we need to some metadata so -# KFP knows where to consume the metrics from. -# -# This will get used when we create the KFP container. - - -ui_metadata: Dict[str, object] = { - "outputs": [ - { - "type": "tensorboard", - "source": os.path.join(logs_path, "lightning_logs"), - } - ] -} - -# %% -# For the inference, we're leveraging one of the builtin TorchX components. This -# component takes in a model and uploads it to the TorchServe management API -# endpoints. - - -serve_app: specs.AppDef = torchserve( - model_path=os.path.join(models_path, "model.mar"), - management_api=args.management_api, - image=args.image, - params={ - "model_name": args.model_name, - # set this to allocate a worker - # "initial_workers": 1, - }, -) - -# %% -# For model interpretability we're leveraging a custom component stored in it's -# own component file. This component takes in the output from datapreproc and -# train components and produces images with integrated gradient results. - -interpret_path: str = os.path.join(args.output_path, "interpret") -interpret_app: specs.AppDef = utils_python( - *( - "--load_path", - os.path.join(models_path, "last.ckpt"), - "--data_path", - processed_data_path, - "--output_path", - interpret_path, - ), - image=args.image, - m="torchx.examples.apps.lightning.interpret", -) - -# %% -# Pipeline Definition -# ################### -# The last step is to define the actual pipeline using the torchx components via -# the KFP adapter and export the pipeline package that can be uploaded to a KFP -# cluster. -# -# The KFP adapter currently doesn't track the input and outputs so the -# containers need to have their dependencies specified via `.after()`. -# -# We call `.set_tty()` to make the logs from the components more responsive for -# example purposes. - - -def pipeline() -> None: - # container_from_app creates a KFP container from the TorchX app - # definition. - copy = container_from_app(copy_app) - copy.container.set_tty() - - datapreproc = container_from_app(datapreproc_app) - datapreproc.container.set_tty() - datapreproc.after(copy) - - # For the trainer we want to log that UI metadata so you can access - # tensorboard from the UI. - trainer = container_from_app(trainer_app, ui_metadata=ui_metadata) - trainer.container.set_tty() - trainer.after(datapreproc) - - if False: - serve = container_from_app(serve_app) - serve.container.set_tty() - serve.after(trainer) - - if False: - # Serve and interpret only require the trained model so we can run them - # in parallel to each other. - interpret = container_from_app(interpret_app) - interpret.container.set_tty() - interpret.after(trainer) - - -kfp.compiler.Compiler().compile( - pipeline_func=pipeline, - package_path="pipeline.yaml", -) - -with open("pipeline.yaml", "rt") as f: - print(f.read()) - -# %% -# Once this has all run you should have a pipeline file (typically -# pipeline.yaml) that you can upload to your KFP cluster via the UI or -# a kfp.Client. - -# sphinx_gallery_thumbnail_path = '_static/img/gallery-kfp.png' diff --git a/torchx/examples/pipelines/kfp/dist_pipeline.py b/torchx/examples/pipelines/kfp/dist_pipeline.py deleted file mode 100755 index 4cf8f2e05..000000000 --- a/torchx/examples/pipelines/kfp/dist_pipeline.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -""" -Distributed KubeFlow Pipelines Example -====================================== - -This is an example KFP pipeline that uses resource_from_app to launch a -distributed operator using the kubernetes/volcano job scheduler. This only works -in Kubernetes KFP clusters with https://volcano.sh/en/docs/ installed on them. -""" - -import kfp -from torchx import specs -from torchx.pipelines.kfp.adapter import resource_from_app - - -def pipeline() -> None: - # First we define our AppDef for the component, we set - echo_app = specs.AppDef( - name="test-dist", - roles=[ - specs.Role( - name="dist-echo", - image="alpine", - entrypoint="/bin/echo", - args=["hello dist!"], - num_replicas=3, - ), - ], - ) - - # To convert the TorchX AppDef into a KFP container we use - # the resource_from_app adapter. This takes generates a KFP Kubernetes - # resource operator definition from the TorchX app def and instantiates it. - echo_container: kfp.dsl.BaseOp = resource_from_app(echo_app, queue="default") - - -# %% -# To generate the pipeline definition file we need to call into the KFP compiler -# with our pipeline function. - -kfp.compiler.Compiler().compile( - pipeline_func=pipeline, - package_path="pipeline.yaml", -) - -with open("pipeline.yaml", "rt") as f: - print(f.read()) - -# %% -# Once this has all run you should have a pipeline file (typically -# pipeline.yaml) that you can upload to your KFP cluster via the UI or -# a kfp.Client. -# -# See the -# `KFP SDK Examples `_ -# for more info on launching KFP pipelines. - -# %% -# See the :ref:`examples_pipelines/kfp/advanced_pipeline:Advanced KubeFlow Pipelines Example` for how to chain multiple -# components together and use builtin components. - - -# sphinx_gallery_thumbnail_path = '_static/img/gallery-kfp.png' diff --git a/torchx/examples/pipelines/kfp/intro_pipeline.py b/torchx/examples/pipelines/kfp/intro_pipeline.py deleted file mode 100755 index 07130b338..000000000 --- a/torchx/examples/pipelines/kfp/intro_pipeline.py +++ /dev/null @@ -1,83 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -""" -Intro KubeFlow Pipelines Example -================================ - -This an introductory pipeline using KubeFlow Pipelines built with only TorchX -components. - -TorchX is intended to allow making cross platform components. As such, we have -a standard definition that uses adapters to convert it to the specific -pipeline platform. This is an example of using the KFP adapter to run a TorchX -component as part of a KubeFlow Pipeline. - -TorchX tries to leverage standard mechanisms wherever possible. For KFP we use -the existing KFP pipeline definition syntax and add a single -`component_from_app` conversion step to convert a TorchX component into one -KFP can understand. - -Typically you have a separate component file but for this example we define the -AppDef inline. -""" - -import kfp -from torchx import specs -from torchx.pipelines.kfp.adapter import container_from_app - - -def pipeline() -> None: - # First we define our AppDef for the component. AppDef is a core part of TorchX - # and can be used to describe complex distributed multi container apps or - # just a single node component like here. - echo_app: specs.AppDef = specs.AppDef( - name="examples-intro", - roles=[ - specs.Role( - name="worker", - entrypoint="/bin/echo", - args=["Hello TorchX!"], - image="alpine", - ) - ], - ) - - # To convert the TorchX AppDef into a KFP container we use - # the container_from_app adapter. This takes generates a KFP component - # definition from the TorchX app def and instantiates it into a container. - echo_container: kfp.dsl.ContainerOp = container_from_app(echo_app) - - -# %% -# To generate the pipeline definition file we need to call into the KFP compiler -# with our pipeline function. - -kfp.compiler.Compiler().compile( - pipeline_func=pipeline, - package_path="pipeline.yaml", -) - -with open("pipeline.yaml", "rt") as f: - print(f.read()) - -# %% -# Once this has all run you should have a pipeline file (typically -# pipeline.yaml) that you can upload to your KFP cluster via the UI or -# a kfp.Client. -# -# See the -# `KFP SDK Examples `_ -# for more info on launching KFP pipelines. - -# %% -# See the :ref:`examples_pipelines/kfp/advanced_pipeline:Advanced KubeFlow Pipelines Example` for how to chain multiple -# components together and use builtin components. - -# sphinx_gallery_thumbnail_path = '_static/img/gallery-kfp.png' diff --git a/torchx/examples/pipelines/kfp/test/__init__.py b/torchx/examples/pipelines/kfp/test/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py b/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py deleted file mode 100644 index f4d26da4b..000000000 --- a/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import os -import os.path -import sys -import tempfile -import unittest - - -class KFPPipelineTest(unittest.TestCase): - def setUp(self) -> None: - self.dir = tempfile.TemporaryDirectory() # noqa: P201 - self.orig_dir = os.getcwd() - os.chdir(self.dir.name) - - def tearDown(self) -> None: - os.chdir(self.orig_dir) - self.dir.cleanup() - - def test_kfp_pipeline(self) -> None: - sys.argv = [ - "advanced_pipeline.py", - "--output_path", - "bar", - ] - from torchx.examples.pipelines.kfp import advanced_pipeline # noqa: F401 - - self.assertTrue(os.path.exists("pipeline.yaml")) - - def test_intro_pipeline(self) -> None: - sys.argv = ["intro_pipeline.py"] - from torchx.examples.pipelines.kfp import intro_pipeline # noqa: F401 - - self.assertTrue(os.path.exists("pipeline.yaml")) - - def test_dist_pipeline(self) -> None: - sys.argv = ["dist_pipeline.py"] - from torchx.examples.pipelines.kfp import dist_pipeline # noqa: F401 - - self.assertTrue(os.path.exists("pipeline.yaml")) diff --git a/torchx/pipelines/kfp/__init__.py b/torchx/pipelines/kfp/__init__.py deleted file mode 100644 index 1adeede50..000000000 --- a/torchx/pipelines/kfp/__init__.py +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -""" -This module contains adapters for converting TorchX components into KubeFlow -Pipeline components. - -The current KFP adapters only support single node (1 role and 1 replica) -components. -""" - -import kfp - -from .version import __version__ as __version__ # noqa F401 - - -def _check_kfp_version() -> None: - if not kfp.__version__.startswith("1."): - raise ImportError( - f"Only kfp version 1.x.x is supported! kfp version {kfp.__version__}" - ) - - -_check_kfp_version() diff --git a/torchx/pipelines/kfp/adapter.py b/torchx/pipelines/kfp/adapter.py deleted file mode 100644 index 427f25f44..000000000 --- a/torchx/pipelines/kfp/adapter.py +++ /dev/null @@ -1,274 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import json -import os -import os.path -import shlex -from typing import Mapping, Optional, Tuple - -import yaml -from kfp import components, dsl - -# @manual=fbsource//third-party/pypi/kfp:kfp -from kfp.components.structures import ComponentSpec, OutputSpec -from kubernetes.client.models import ( - V1ContainerPort, - V1EmptyDirVolumeSource, - V1Volume, - V1VolumeMount, -) -from torchx.schedulers.kubernetes_scheduler import app_to_resource, pod_labels -from torchx.specs import api -from typing_extensions import Protocol - -from .version import __version__ as __version__ # noqa F401 - - -def component_spec_from_app(app: api.AppDef) -> Tuple[str, api.Role]: - """ - component_spec_from_app takes in a TorchX component and generates the yaml - spec for it. Notably this doesn't apply resources or port_maps since those - must be applied at runtime which is why it returns the role spec as well. - - >>> from torchx import specs - >>> from torchx.pipelines.kfp.adapter import component_spec_from_app - >>> app_def = specs.AppDef( - ... name="trainer", - ... roles=[specs.Role("trainer", image="foo:latest")], - ... ) - >>> component_spec_from_app(app_def) - ('description: ...', Role(...)) - """ - assert len(app.roles) == 1, f"KFP adapter only support one role, got {app.roles}" - - role = app.roles[0] - assert ( - role.num_replicas - == 1 - # pyre-fixme[16]: `AppDef` has no attribute `num_replicas`. - ), f"KFP adapter only supports one replica, got {app.num_replicas}" - - command = [role.entrypoint, *role.args] - - spec = { - "name": f"{app.name}-{role.name}", - "description": f"KFP wrapper for TorchX component {app.name}, role {role.name}", - "implementation": { - "container": { - "image": role.image, - "command": command, - "env": role.env, - } - }, - "outputs": [], - } - return yaml.dump(spec), role - - -class ContainerFactory(Protocol): - """ - ContainerFactory is a protocol that represents a function that when called produces a - kfp.dsl.ContainerOp. - """ - - def __call__(self, *args: object, **kwargs: object) -> dsl.ContainerOp: ... - - -class KFPContainerFactory(ContainerFactory, Protocol): - """ - KFPContainerFactory is a ContainerFactory that also has some KFP metadata - attached to it. - """ - - component_spec: ComponentSpec - - -METADATA_FILE = "/tmp/outputs/mlpipeline-ui-metadata/data.json" - - -def component_from_app( - app: api.AppDef, ui_metadata: Optional[Mapping[str, object]] = None -) -> ContainerFactory: - """ - component_from_app takes in a TorchX component/AppDef and returns a KFP - ContainerOp factory. This is equivalent to the - `kfp.components.load_component_from_* - `_ - methods. - - Args: - app: The AppDef to generate a KFP container factory for. - ui_metadata: KFP UI Metadata to output so you can have model results show - up in the UI. See - https://www.kubeflow.org/docs/components/pipelines/legacy-v1/sdk/output-viewer/ - for more info on the format. - - >>> from torchx import specs - >>> from torchx.pipelines.kfp.adapter import component_from_app - >>> app_def = specs.AppDef( - ... name="trainer", - ... roles=[specs.Role("trainer", image="foo:latest")], - ... ) - >>> component_from_app(app_def) - - """ - - role_spec: api.Role - spec, role_spec = component_spec_from_app(app) - resources: api.Resource = role_spec.resource - assert ( - len(resources.capabilities) == 0 - ), f"KFP doesn't support capabilities, got {resources.capabilities}" - component_factory: KFPContainerFactory = components.load_component_from_text(spec) - - if ui_metadata is not None: - # pyre-fixme[16]: `ComponentSpec` has no attribute `outputs` - component_factory.component_spec.outputs.append( - OutputSpec( - name="mlpipeline-ui-metadata", - type="MLPipeline UI Metadata", - description="ui metadata", - ) - ) - - def factory_wrapper(*args: object, **kwargs: object) -> dsl.ContainerOp: - c = component_factory(*args, **kwargs) - container = c.container - - if ui_metadata is not None: - # We generate the UI metadata from the sidecar so we need to make - # both the container and the sidecar share the same tmp directory so - # the outputs appear in the original container. - c.add_volume(V1Volume(name="tmp", empty_dir=V1EmptyDirVolumeSource())) - container.add_volume_mount( - V1VolumeMount( - name="tmp", - mount_path="/tmp/", - ) - ) - c.output_artifact_paths["mlpipeline-ui-metadata"] = METADATA_FILE - c.add_sidecar(_ui_metadata_sidecar(ui_metadata)) - - cpu = resources.cpu - if cpu >= 0: - cpu_str = f"{int(cpu*1000)}m" - container.set_cpu_request(cpu_str) - container.set_cpu_limit(cpu_str) - mem = resources.memMB - if mem >= 0: - mem_str = f"{int(mem)}M" - container.set_memory_request(mem_str) - container.set_memory_limit(mem_str) - gpu = resources.gpu - if gpu > 0: - container.set_gpu_limit(str(gpu)) - - for name, port in role_spec.port_map.items(): - container.add_port( - V1ContainerPort( - name=name, - container_port=port, - ), - ) - - c.pod_labels.update(pod_labels(app, 0, role_spec, 0, app.name)) - - return c - - return factory_wrapper - - -def _ui_metadata_sidecar( - ui_metadata: Mapping[str, object], image: str = "alpine" -) -> dsl.Sidecar: - shell_encoded = shlex.quote(json.dumps(ui_metadata)) - dirname = os.path.dirname(METADATA_FILE) - return dsl.Sidecar( - name="ui-metadata-sidecar", - image=image, - command=[ - "sh", - "-c", - f"mkdir -p {dirname}; echo {shell_encoded} > {METADATA_FILE}", - ], - mirror_volume_mounts=True, - ) - - -def container_from_app( - app: api.AppDef, - *args: object, - ui_metadata: Optional[Mapping[str, object]] = None, - **kwargs: object, -) -> dsl.ContainerOp: - """ - container_from_app transforms the app into a KFP component and returns a - corresponding ContainerOp instance. - - See component_from_app for description on the arguments. Any unspecified - arguments are passed through to the KFP container factory method. - - >>> import kfp - >>> from torchx import specs - >>> from torchx.pipelines.kfp.adapter import container_from_app - >>> app_def = specs.AppDef( - ... name="trainer", - ... roles=[specs.Role("trainer", image="foo:latest")], - ... ) - >>> def pipeline(): - ... trainer = container_from_app(app_def) - ... print(trainer) - >>> kfp.compiler.Compiler().compile( - ... pipeline_func=pipeline, - ... package_path="/tmp/pipeline.yaml", - ... ) - {'ContainerOp': {... 'name': 'trainer-trainer', ...}} - """ - factory = component_from_app(app, ui_metadata) - return factory(*args, **kwargs) - - -def resource_from_app( - app: api.AppDef, - queue: str, - service_account: Optional[str] = None, -) -> dsl.ResourceOp: - """ - resource_from_app generates a KFP ResourceOp from the provided app that uses - the Volcano job scheduler on Kubernetes to run distributed apps. See - https://volcano.sh/en/docs/ for more info on Volcano and how to install. - - Args: - app: The torchx AppDef to adapt. - queue: the Volcano queue to schedule the operator in. - - >>> import kfp - >>> from torchx import specs - >>> from torchx.pipelines.kfp.adapter import resource_from_app - >>> app_def = specs.AppDef( - ... name="trainer", - ... roles=[specs.Role("trainer", image="foo:latest", num_replicas=3)], - ... ) - >>> def pipeline(): - ... trainer = resource_from_app(app_def, queue="test") - ... print(trainer) - >>> kfp.compiler.Compiler().compile( - ... pipeline_func=pipeline, - ... package_path="/tmp/pipeline.yaml", - ... ) - {'ResourceOp': {... 'name': 'trainer-0', ... 'name': 'trainer-1', ... 'name': 'trainer-2', ...}} - """ - return dsl.ResourceOp( - name=app.name, - action="create", - success_condition="status.state.phase = Completed", - failure_condition="status.state.phase = Failed", - k8s_resource=app_to_resource(app, queue, service_account=service_account), - ) diff --git a/torchx/pipelines/kfp/test/__init__.py b/torchx/pipelines/kfp/test/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/torchx/pipelines/kfp/test/adapter_test.py b/torchx/pipelines/kfp/test/adapter_test.py deleted file mode 100644 index df7b743a8..000000000 --- a/torchx/pipelines/kfp/test/adapter_test.py +++ /dev/null @@ -1,155 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import os.path -import tempfile -import unittest -from typing import Callable, List - -import torchx -import yaml -from kfp import compiler, components, dsl -from kubernetes.client.models import V1ContainerPort, V1ResourceRequirements -from torchx.pipelines.kfp.adapter import ( - component_from_app, - component_spec_from_app, - container_from_app, - ContainerFactory, -) -from torchx.specs import api - - -class KFPSpecsTest(unittest.TestCase): - """ - tests KFP components using torchx.specs.api - """ - - def _test_app(self) -> api.AppDef: - trainer_role = api.Role( - name="trainer", - image="pytorch/torchx:latest", - entrypoint="main", - args=["--output-path", "blah"], - env={"FOO": "bar"}, - resource=api.Resource( - cpu=2, - memMB=3000, - gpu=4, - ), - port_map={"foo": 1234}, - num_replicas=1, - ) - - return api.AppDef("test", roles=[trainer_role]) - - def _compile_pipeline(self, pipeline: Callable[[], None]) -> None: - with tempfile.TemporaryDirectory() as tmpdir: - pipeline_file = os.path.join(tmpdir, "pipeline.yaml") - compiler.Compiler().compile(pipeline, pipeline_file) - with open(pipeline_file, "r") as f: - data = yaml.safe_load(f) - - spec = data["spec"] - templates = spec["templates"] - self.assertGreaterEqual(len(templates), 2) - - def test_component_spec_from_app(self) -> None: - app = self._test_app() - - spec, role = component_spec_from_app(app) - self.assertIsNotNone(components.load_component_from_text(spec)) - self.assertEqual(role.resource, app.roles[0].resource) - self.assertEqual( - spec, - """description: KFP wrapper for TorchX component test, role trainer -implementation: - container: - command: - - main - - --output-path - - blah - env: - FOO: bar - image: pytorch/torchx:latest -name: test-trainer -outputs: [] -""", - ) - - def test_pipeline(self) -> None: - app = self._test_app() - kfp_copy: ContainerFactory = component_from_app(app) - - def pipeline() -> None: - a = kfp_copy() - resources: V1ResourceRequirements = a.container.resources - self.assertEqual( - resources, - V1ResourceRequirements( - limits={ - "cpu": "2000m", - "memory": "3000M", - "nvidia.com/gpu": "4", - }, - requests={ - "cpu": "2000m", - "memory": "3000M", - }, - ), - ) - ports: List[V1ContainerPort] = a.container.ports - self.assertEqual( - ports, - [V1ContainerPort(name="foo", container_port=1234)], - ) - - b = kfp_copy() - b.after(a) - - self._compile_pipeline(pipeline) - - def test_pipeline_metadata(self) -> None: - app = self._test_app() - metadata = {} - kfp_copy: ContainerFactory = component_from_app(app, metadata) - - def pipeline() -> None: - a = kfp_copy() - self.assertEqual(len(a.volumes), 1) - self.assertEqual(len(a.container.volume_mounts), 1) - self.assertEqual(len(a.sidecars), 1) - self.assertEqual( - a.output_artifact_paths["mlpipeline-ui-metadata"], - "/tmp/outputs/mlpipeline-ui-metadata/data.json", - ) - self.assertEqual( - a.pod_labels, - { - "app.kubernetes.io/instance": "test", - "app.kubernetes.io/managed-by": "torchx.pytorch.org", - "app.kubernetes.io/name": "test", - "torchx.pytorch.org/version": torchx.__version__, - "torchx.pytorch.org/app-name": "test", - "torchx.pytorch.org/role-index": "0", - "torchx.pytorch.org/role-name": "trainer", - "torchx.pytorch.org/replica-id": "0", - }, - ) - - self._compile_pipeline(pipeline) - - def test_container_from_app(self) -> None: - app: api.AppDef = self._test_app() - - def pipeline() -> None: - a: dsl.ContainerOp = container_from_app(app) - b: dsl.ContainerOp = container_from_app(app) - b.after(a) - - self._compile_pipeline(pipeline) diff --git a/torchx/pipelines/kfp/test/suites.py b/torchx/pipelines/kfp/test/suites.py deleted file mode 100644 index bb37bff19..000000000 --- a/torchx/pipelines/kfp/test/suites.py +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -import os -import random -import unittest -from itertools import chain - - -def _circleci_parallelism(suite: unittest.TestSuite) -> unittest.TestSuite: - """Allow for parallelism in CircleCI for speedier tests..""" - if int(os.environ.get("CIRCLE_NODE_TOTAL", 0)) <= 1: - # either not running on circleci, or we're not using parallelism. - return suite - # tests are automatically sorted by discover, so we will get the same ordering - # on all hosts. - total = int(os.environ["CIRCLE_NODE_TOTAL"]) - index = int(os.environ["CIRCLE_NODE_INDEX"]) - - # right now each test is corresponds to a /file/. Certain files are slower than - # others, so we want to flatten it - # pyre-fixme[16]: `TestCase` has no attribute `_tests`. - tests = [testfile._tests for testfile in suite._tests] - tests = list(chain.from_iterable(tests)) - random.Random(42).shuffle(tests) - tests = [t for i, t in enumerate(tests) if i % total == index] - return unittest.TestSuite(tests) - - -def unittests() -> unittest.TestSuite: - """ - Short tests. - - Runs on CircleCI on every commit. Returns everything in the tests root directory. - """ - test_loader = unittest.TestLoader() - test_suite = test_loader.discover( - "torchx/kfp", pattern="*_test.py", top_level_dir="." - ) - test_suite = _circleci_parallelism(test_suite) - return test_suite - - -if __name__ == "__main__": - runner = unittest.TextTestRunner() - runner.run(unittests()) diff --git a/torchx/pipelines/kfp/test/version_test.py b/torchx/pipelines/kfp/test/version_test.py deleted file mode 100644 index f932f5b7b..000000000 --- a/torchx/pipelines/kfp/test/version_test.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import importlib -import unittest -from unittest.mock import patch - - -class VersionTest(unittest.TestCase): - def test_can_get_version(self) -> None: - import torchx.pipelines.kfp - - self.assertIsNotNone(torchx.pipelines.kfp.__version__) - - def test_kfp_1x(self) -> None: - import torchx.pipelines.kfp - - with patch("kfp.__version__", "2.0.1"): - with self.assertRaisesRegex(ImportError, "Only kfp version"): - importlib.reload(torchx.pipelines.kfp) - - with patch("kfp.__version__", "1.5.0"): - importlib.reload(torchx.pipelines.kfp) diff --git a/torchx/pipelines/kfp/version.py b/torchx/pipelines/kfp/version.py deleted file mode 100644 index a9244e289..000000000 --- a/torchx/pipelines/kfp/version.py +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -# Follows PEP-0440 version scheme guidelines -# https://www.python.org/dev/peps/pep-0440/#version-scheme -# -# Examples: -# 0.1.0.devN # Developmental release -# 0.1.0aN # Alpha release -# 0.1.0bN # Beta release -# 0.1.0rcN # Release Candidate -# 0.1.0 # Final release -__version__ = "0.1.0.dev0" diff --git a/torchx/schedulers/__init__.py b/torchx/schedulers/__init__.py index c48cebaeb..a0621c1d2 100644 --- a/torchx/schedulers/__init__.py +++ b/torchx/schedulers/__init__.py @@ -22,7 +22,6 @@ "aws_batch": "torchx.schedulers.aws_batch_scheduler", "aws_sagemaker": "torchx.schedulers.aws_sagemaker_scheduler", "gcp_batch": "torchx.schedulers.gcp_batch_scheduler", - "ray": "torchx.schedulers.ray_scheduler", "lsf": "torchx.schedulers.lsf_scheduler", } diff --git a/torchx/schedulers/ray/__init__.py b/torchx/schedulers/ray/__init__.py deleted file mode 100644 index a9fdb3b99..000000000 --- a/torchx/schedulers/ray/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. diff --git a/torchx/schedulers/ray/ray_common.py b/torchx/schedulers/ray/ray_common.py deleted file mode 100644 index 028e72f56..000000000 --- a/torchx/schedulers/ray/ray_common.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -from dataclasses import dataclass, field -from typing import Dict, List, Optional - -TORCHX_RANK0_HOST: str = "TORCHX_RANK0_HOST" - - -@dataclass -class RayActor: - """Describes an actor (a.k.a. worker/replica in TorchX terms).""" - - name: str - command: List[str] - env: Dict[str, str] = field(default_factory=dict) - num_cpus: int = 1 - num_gpus: int = 0 - min_replicas: Optional[int] = None diff --git a/torchx/schedulers/ray/ray_driver.py b/torchx/schedulers/ray/ray_driver.py deleted file mode 100644 index cd7d647c4..000000000 --- a/torchx/schedulers/ray/ray_driver.py +++ /dev/null @@ -1,307 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -""" -We use placement groups to reserve resources in the ray cluster, it -ensure that a job will not lose the resources it used to have before -the job is finished. The deadlock situtation while launch multiple jobs at the -same time is avoided by create a big placement group that contains the minimum -required command actors for the job. Once the placement groups are created(may -not be scheduled on a physical node yet), then we schedule command actors to -the corresponding placement group, each actor is associated with a placement -group which hold the resource the acotr needs. Each time a placement group successfully -acquired the resources from the ray cluster, the actor scheduled to this placement group -will be executed. Command actors are state machines their behavior is defined by the -_step function, this give more flexibility to us if we want to bette handle the -node failures. -""" -import json -import logging -import os -import socket -import subprocess -import sys - -from contextlib import closing -from dataclasses import dataclass -from typing import Dict, List, Optional, Tuple, TYPE_CHECKING - -import ray -from ray.util.placement_group import PlacementGroup - -if TYPE_CHECKING: - from torchx.schedulers.ray.ray_common import RayActor, TORCHX_RANK0_HOST - -# Hack to make code work for tests as well as running ray job. -# For tests the `torchx.schedulers.ray.ray_common` import must be used -# For running ray jobs `ray_common` import must be used -try: - # pyre-fixme[21]: Could not find a module corresponding to import `ray_common`. - from ray_common import RayActor, TORCHX_RANK0_HOST # noqa: F811 -except ModuleNotFoundError: - from torchx.schedulers.ray.ray_common import RayActor, TORCHX_RANK0_HOST - -_logger: logging.Logger = logging.getLogger(__name__) -_logger.setLevel(logging.getLevelName(os.environ.get("LOGLEVEL", "INFO"))) -logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) - - -@dataclass -class RayResult: - id: str - - -class TaskCompleted(RayResult): - pass - - -class CommandActorScheduled(RayResult): - pass - - -@ray.remote -class CommandActor: # pragma: no cover - def __init__(self, cmd: List[str], env: Dict[str, str]) -> None: - self.cmd: List[str] = cmd - self.env: Dict[str, str] = env - - def exec_module( - self, master_addr: str, master_port: int, actor_id: str - ) -> TaskCompleted: - """Execute a user script""" - if master_addr is None or master_port is None: - raise RuntimeError( - "Either MASTER_ADDR or MASTER_PORT are not set. This is most likely bug in torchx" - "Open issue at https://github.com/pytorch/torchx" - ) - worker_evn = {} - worker_evn.update(os.environ) - worker_evn.update(self.env) - worker_evn[TORCHX_RANK0_HOST] = master_addr - popen = subprocess.Popen(self.cmd, env=worker_evn) - - returncode = popen.wait() - _logger.info(f"Finished with code {returncode}") - - if returncode != 0: - raise RuntimeError(f"exec_module failed with return code {returncode}") - - return TaskCompleted(actor_id) - - def schedule(self, actor_id: str) -> CommandActorScheduled: - """Testing if a command actor is scheduled""" - return CommandActorScheduled(actor_id) - - def get_actor_address_and_port(self) -> Tuple[str, int]: - addr = ray.util.get_node_ip_address() - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: - s.bind(("", 0)) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - port = s.getsockname()[1] - return addr, port - - -def load_actor_json(filename: str) -> List[RayActor]: - """Loading replicas specifications from a JSON file""" - with open(filename) as f: - actors: List[RayActor] = [] - # Yes this is gross but it works - actor_dict = json.load(f) - actor_dict = json.loads(actor_dict) - for actor in actor_dict: - actors.append(RayActor(**actor)) - return actors - - -def create_placement_group_async(replicas: List[RayActor]) -> PlacementGroup: # type: ignore - """return a placement group reference, the corresponding placement group could be scheduled or pending""" - bundles = [] - for replica in replicas: - bundles.append({"CPU": replica.num_cpus, "GPU": replica.num_gpus}) - - pg = ray.util.placement_group(bundles, strategy="SPREAD") - return pg - - -@dataclass -class ActorInfo: - """Used to store the information for restoring a failed command actor""" - - pg: PlacementGroup - replica: RayActor - actor: CommandActor - - -class RayDriver: - def __init__(self, replicas: List[RayActor]) -> None: - self.replicas = replicas - self.master_node_id: Optional[str] = None # the actor id of the master node - self.rank_0_address: Optional[str] = None - self.rank_0_port: Optional[int] = None - self.max_replicas: int = len(replicas) - self.min_replicas: int - if replicas[0].min_replicas is None: - self.min_replicas = self.max_replicas - else: - self.min_replicas = replicas[0].min_replicas # pyre-ignore[8] - - self.placement_groups: List[PlacementGroup] = ( - [] - ) # all the placement groups, shall never change - self.actor_info_of_id: Dict[str, ActorInfo] = ( - {} - ) # store the info used to recover an actor - self.active_tasks: List["ray.ObjectRef"] = [] # list of active tasks - - self.terminating: bool = False # if the job has finished and being terminated - self.command_actors_count: int = 0 # number of created command actors - - def init_placement_groups(self) -> None: - """Initialize all placement groups needed for this job""" - # find the actor specifications of a given placement group - replica_ix_of_pg: List[int] = [0] + list( - range( - self.min_replicas, - self.max_replicas + 1, - ) - ) - # create all the placement groups - initial_group = create_placement_group_async( - self.replicas[replica_ix_of_pg[0] : replica_ix_of_pg[1]] - ) - _logger.info("Waiting for minimum placement group to start.") - ready = initial_group.wait(100) - if not ready: # pragma: no cover - raise TimeoutError( - "Placement group creation timed out. Make sure " - "your cluster either has enough resources or use " - "an autoscaling cluster. Current resources " - "available: {}, resources requested by the " - "placement group: {}".format( - ray.available_resources(), initial_group.bundle_specs - ) - ) - self.placement_groups.append(initial_group) - for i in range(1, len(replica_ix_of_pg) - 1): - self.placement_groups.append( - create_placement_group_async( - self.replicas[replica_ix_of_pg[i] : replica_ix_of_pg[i + 1]] - ) - ) - - def pop_actor_info(self, actor_id: str) -> ActorInfo: - """Remove and return the info of a dead command actor""" - return self.actor_info_of_id.pop(actor_id) - - def create_and_schedule_actor(self, pg: PlacementGroup, replica: RayActor) -> None: - """create an command actor in the given placement group""" - # create the command actor - actor = CommandActor.options( # pyre-ignore[16] - placement_group=pg, - num_cpus=replica.num_cpus, - num_gpus=replica.num_gpus, - ).remote(replica.command, replica.env) - - # get the actor id of the created actor - actor_id = actor._actor_id.hex() - # launch a task to check if the actor is scheduled - self.active_tasks.append(actor.schedule.remote(actor_id)) - # save the actor info for recovering from node failures - self.actor_info_of_id[actor_id] = ActorInfo( - actor=actor, - pg=pg, - replica=replica, - ) - - def place_command_actors(self) -> None: - """Creating all command actors in all placement groups""" - # find the placement group index for a replica(actor's specification) - pg_ix_of_replica: List[int] = [ - max(0, i - self.min_replicas + 1) for i in range(len(self.replicas)) - ] - # create the actors - for i in range(len(self.replicas)): - pg_ix = pg_ix_of_replica[i] - pg = self.placement_groups[pg_ix] # find the created placement group - replica = self.replicas[i] - self.create_and_schedule_actor(pg, replica) - - def _step(self) -> bool: - """Handling command actor's return""" - result: RayResult # execution result - _logger.info(f"running ray.wait on {self.active_tasks}") - # ray.wait is partial waiting - completed_tasks, self.active_tasks = ray.wait(self.active_tasks) - # If a failure occurs the ObjectRef will be marked as completed. - # Calling ray.get will expose the failure as a RayActorError. - for object_ref in completed_tasks: - result = ray.get(object_ref) - if isinstance(result, CommandActorScheduled): - if not self.terminating: - actor = self.actor_info_of_id[result.id].actor - if self.master_node_id is None: - # make this actor be the master node - self.master_node_id = result.id - self.rank_0_address, self.rank_0_port = ray.get( - actor.get_actor_address_and_port.remote() # pyre-ignore - ) - self.active_tasks.append( - actor.exec_module.remote( # pyre-ignore - "localhost", 0, result.id - ) - ) - else: - self.active_tasks.append( - actor.exec_module.remote( - self.rank_0_address, self.rank_0_port, result.id - ) - ) - self.command_actors_count += 1 - elif isinstance(result, TaskCompleted): - self.terminating = ( - True # terminating the job, wait for all actors to finish - ) - self.command_actors_count -= 1 # 1 completed command actor - self.pop_actor_info(result.id) - if ( - self.command_actors_count == 0 - ): # all the command actors have finished - return True # is terminal - else: - raise RuntimeError( - f"Ray actor returns unknown type {type(result)}" - "This is most likely bug in torchx" - "Open issue at https://github.com/pytorch/torchx" - ) - return False - - def run(self) -> None: - """This is the main loop the ray driver, it executes the user script on the scheduled nodes, - and restart the failed nodes(node failures). The loop ends when all the actors that joining - the job exits.""" - self.terminating = False - self.command_actors_count = 0 - # Await return result of remote ray function and initialize new command actors - while len(self.active_tasks) > 0: - terminal = self._step() - if terminal: - break - - -def main() -> None: # pragma: no cover - actors: List[RayActor] = load_actor_json("actors.json") - driver = RayDriver(actors) - ray.init(address="auto", namespace="torchx-ray") - driver.init_placement_groups() - _logger.info("Successfully created placement groups") - driver.place_command_actors() - _logger.info("Successfully placed command actors") - _logger.info("Entering main loop, start executing the script on worker nodes") - driver.run() - - -if __name__ == "__main__": - main() diff --git a/torchx/schedulers/ray_scheduler.py b/torchx/schedulers/ray_scheduler.py deleted file mode 100644 index ca726dc01..000000000 --- a/torchx/schedulers/ray_scheduler.py +++ /dev/null @@ -1,454 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -import dataclasses -import json -import logging -import os -import re -import tempfile -import time -from dataclasses import dataclass, field -from datetime import datetime -from shutil import copy2, rmtree -from typing import ( # noqa - Any, - cast, - Dict, - Final, - Iterable, - List, - Optional, - Tuple, - TypedDict, -) - -import urllib3 - -from ray.autoscaler import sdk as ray_autoscaler_sdk -from ray.dashboard.modules.job.common import JobStatus -from ray.dashboard.modules.job.sdk import JobSubmissionClient - -from torchx.schedulers.api import ( - AppState, - DescribeAppResponse, - filter_regex, - ListAppResponse, - Scheduler, - split_lines, - Stream, -) -from torchx.schedulers.ids import make_unique -from torchx.schedulers.ray.ray_common import RayActor, TORCHX_RANK0_HOST -from torchx.specs import ( - AppDef, - AppDryRunInfo, - macros, - NONE, - ReplicaStatus, - Role, - RoleStatus, - runopts, -) -from torchx.workspace.dir_workspace import TmpDirWorkspaceMixin - - -class RayOpts(TypedDict, total=False): - cluster_config_file: Optional[str] - cluster_name: Optional[str] - dashboard_address: Optional[str] - working_dir: Optional[str] - requirements: Optional[str] - - -_logger: logging.Logger = logging.getLogger(__name__) - -_ray_status_to_torchx_appstate: Dict[JobStatus, AppState] = { - JobStatus.PENDING: AppState.PENDING, - JobStatus.RUNNING: AppState.RUNNING, - JobStatus.SUCCEEDED: AppState.SUCCEEDED, - JobStatus.FAILED: AppState.FAILED, - JobStatus.STOPPED: AppState.CANCELLED, -} - - -class _EnhancedJSONEncoder(json.JSONEncoder): - def default(self, o: RayActor): # pyre-ignore[3] - if dataclasses.is_dataclass(o): - return dataclasses.asdict(o) - return super().default(o) - - -def serialize( - actors: List[RayActor], dirpath: str, output_filename: str = "actors.json" -) -> None: - actors_json = json.dumps(actors, cls=_EnhancedJSONEncoder) - with open(os.path.join(dirpath, output_filename), "w") as tmp: - json.dump(actors_json, tmp) - - -@dataclass -class RayJob: - """Represents a job that should be run on a Ray cluster. - - Attributes: - app_id: - The unique ID of the application (a.k.a. job). - cluster_config_file: - The Ray cluster configuration file. - cluster_name: - The cluster name to use. - dashboard_address: - The existing dashboard IP address to connect to - working_dir: - The working directory to copy to the cluster - requirements: - The libraries to install on the cluster per requirements.txt - actors: - The Ray actors which represent the job to be run. This attribute is - dumped to a JSON file and copied to the cluster where `ray_main.py` - uses it to initiate the job. - """ - - app_id: str - working_dir: str - cluster_config_file: Optional[str] = None - cluster_name: Optional[str] = None - dashboard_address: Optional[str] = None - requirements: Optional[str] = None - actors: List[RayActor] = field(default_factory=list) - - -class RayScheduler( - TmpDirWorkspaceMixin, Scheduler[RayOpts, AppDef, AppDryRunInfo[RayJob]] -): - """ - RayScheduler is a TorchX scheduling interface to Ray. The job def - workers will be launched as Ray actors - - The job environment is specified by the TorchX workspace. Any files in - the workspace will be present in the Ray job unless specified in - ``.torchxignore``. Python dependencies will be read from the - ``requirements.txt`` file located at the root of the workspace unless - it's overridden via ``-c ...,requirements=foo/requirements.txt``. - - **Config Options** - - .. runopts:: - class: torchx.schedulers.ray_scheduler.create_scheduler - - **Compatibility** - - .. compatibility:: - type: scheduler - features: - cancel: true - logs: | - Partial support. Ray only supports a single log stream so - only a dummy "ray/0" combined log role is supported. - Tailing and time seeking are not supported. - distributed: true - describe: | - Partial support. RayScheduler will return job status but - does not provide the complete original AppSpec. - workspaces: true - mounts: false - elasticity: Partial support. Multi role jobs are not supported. - - """ - - def __init__( - self, session_name: str, ray_client: Optional[JobSubmissionClient] = None - ) -> None: - # NOTE: make sure any new init options are supported in create_scheduler(...) - super().__init__("ray", session_name) - - # w/o Final None check in _get_ray_client does not work as it pyre assumes mutability - self._ray_client: Final[Optional[JobSubmissionClient]] = ray_client - - def _get_ray_client( - self, job_submission_netloc: Optional[str] = None - ) -> JobSubmissionClient: - if self._ray_client is not None: - client_netloc = urllib3.util.parse_url( - self._ray_client.get_address() - ).netloc - if job_submission_netloc and job_submission_netloc != client_netloc: - raise ValueError( - f"client netloc ({client_netloc}) does not match job netloc ({job_submission_netloc})" - ) - return self._ray_client - elif os.getenv("RAY_ADDRESS"): - return JobSubmissionClient(os.getenv("RAY_ADDRESS")) - elif not job_submission_netloc: - raise Exception( - "RAY_ADDRESS env variable or a scheduler with an attached Ray JobSubmissionClient is expected." - " See https://docs.ray.io/en/latest/cluster/jobs-package-ref.html#job-submission-sdk for more info" - ) - return JobSubmissionClient(f"http://{job_submission_netloc}") - - # TODO: Add address as a potential CLI argument after writing ray.status() or passing in config file - def _run_opts(self) -> runopts: - opts = runopts() - opts.add( - "cluster_config_file", - type_=str, - required=False, - help="Use CLUSTER_CONFIG_FILE to access or create the Ray cluster.", - ) - opts.add( - "cluster_name", - type_=str, - help="Override the configured cluster name.", - ) - opts.add( - "dashboard_address", - type_=str, - required=False, - default="127.0.0.1:8265", - help="Use ray status to get the dashboard address you will submit jobs against", - ) - opts.add("requirements", type_=str, help="Path to requirements.txt") - return opts - - def schedule(self, dryrun_info: AppDryRunInfo[RayJob]) -> str: - cfg: RayJob = dryrun_info.request - - # Create serialized actors for ray_driver.py - actors = cfg.actors - dirpath = cfg.working_dir - serialize(actors, dirpath) - - job_submission_addr: str = "" - if cfg.cluster_config_file: - job_submission_addr = ray_autoscaler_sdk.get_head_node_ip( - cfg.cluster_config_file - ) # pragma: no cover - elif cfg.dashboard_address: - job_submission_addr = cfg.dashboard_address - else: - raise RuntimeError( - "Either `dashboard_address` or `cluster_config_file` must be specified" - ) - - # 0. Create Job Client - client = self._get_ray_client(job_submission_netloc=job_submission_addr) - - # 1. Copy Ray driver utilities - current_directory = os.path.dirname(os.path.abspath(__file__)) - copy2(os.path.join(current_directory, "ray", "ray_driver.py"), dirpath) - copy2(os.path.join(current_directory, "ray", "ray_common.py"), dirpath) - runtime_env = {"working_dir": dirpath} - if cfg.requirements: - runtime_env["pip"] = cfg.requirements - - # 1. Submit Job via the Ray Job Submission API - try: - job_id: str = client.submit_job( - submission_id=cfg.app_id, - # we will pack, hash, zip, upload, register working_dir in GCS of ray cluster - # and use it to configure your job execution. - entrypoint="python3 ray_driver.py", - runtime_env=runtime_env, - ) - - finally: - if dirpath.startswith(tempfile.gettempdir()): - rmtree(dirpath) - - # Encode job submission client in job_id - return f"{job_submission_addr}-{job_id}" - - def _submit_dryrun(self, app: AppDef, cfg: RayOpts) -> AppDryRunInfo[RayJob]: - app_id = make_unique(app.name) - - working_dir = app.roles[0].image - if not os.path.exists(working_dir): - raise RuntimeError( - f"Role image must be a valid directory, got: {working_dir} " - ) - - requirements: Optional[str] = cfg.get("requirements") - if requirements is None: - workspace_reqs = os.path.join(working_dir, "requirements.txt") - if os.path.exists(workspace_reqs): - requirements = workspace_reqs - - cluster_cfg = cfg.get("cluster_config_file") - if cluster_cfg: - if not isinstance(cluster_cfg, str) or not os.path.isfile(cluster_cfg): - raise ValueError("The cluster configuration file must be a YAML file.") - - job: RayJob = RayJob( - app_id, - cluster_config_file=cluster_cfg, - requirements=requirements, - working_dir=working_dir, - ) - - else: # pragma: no cover - dashboard_address = cfg.get("dashboard_address") - job: RayJob = RayJob( - app_id=app_id, - dashboard_address=dashboard_address, - requirements=requirements, - working_dir=working_dir, - ) - job.cluster_name = cfg.get("cluster_name") - - for role in app.roles: - for replica_id in range(role.num_replicas): - # Replace the ${img_root}, ${app_id}, and ${replica_id} placeholders - # in arguments and environment variables. - replica_role = macros.Values( - img_root=role.image, - app_id=app_id, - replica_id=str(replica_id), - rank0_env=TORCHX_RANK0_HOST, - ).apply(role) - - actor = RayActor( - name=role.name, - min_replicas=role.min_replicas, - command=[replica_role.entrypoint] + replica_role.args, - env=replica_role.env, - num_cpus=max(1, replica_role.resource.cpu), - num_gpus=max(0, replica_role.resource.gpu), - ) - - job.actors.append(actor) - - if len(app.roles) > 1 and app.roles[0].min_replicas is not None: - raise ValueError("min_replicas is only supported with single role jobs") - - return AppDryRunInfo(job, repr) - - def _validate(self, app: AppDef, scheduler: str, cfg: RayOpts) -> None: - if scheduler != "ray": - raise ValueError( - f"An unknown scheduler backend '{scheduler}' has been passed to the Ray scheduler." - ) - - if app.metadata: - _logger.warning("The Ray scheduler does not use metadata information.") - - for role in app.roles: - if role.resource.capabilities: - _logger.warning( - "The Ray scheduler does not support custom resource capabilities." - ) - break - - for role in app.roles: - if role.port_map: - _logger.warning("The Ray scheduler does not support port mapping.") - break - - def wait_until_finish(self, app_id: str, timeout: int = 30) -> None: - """ - ``wait_until_finish`` waits until the specified job has finished - with a given timeout. This is intended for testing. Programmatic - usage should use the runner wait method instead. - """ - - start = time.time() - while time.time() - start <= timeout: - status_info = self._get_job_status(app_id) - status = status_info - if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}: - break - time.sleep(1) - - def _parse_app_id(self, app_id: str) -> Tuple[str, str]: - # find index of '-' in the first :\d+- - m = re.search(r":\d+-", app_id) - if m: - sep = m.span()[1] - addr = app_id[: sep - 1] - app_id = app_id[sep:] - return addr, app_id - - addr, _, app_id = app_id.partition("-") - return addr, app_id - - def _cancel_existing(self, app_id: str) -> None: # pragma: no cover - addr, app_id = self._parse_app_id(app_id) - client = self._get_ray_client(job_submission_netloc=addr) - client.stop_job(app_id) - - def _get_job_status(self, app_id: str) -> JobStatus: - addr, app_id = self._parse_app_id(app_id) - client = self._get_ray_client(job_submission_netloc=addr) - status = client.get_job_status(app_id) - if isinstance(status, str): - return cast(JobStatus, status) - return status.status - - def describe(self, app_id: str) -> Optional[DescribeAppResponse]: - job_status_info = self._get_job_status(app_id) - state = _ray_status_to_torchx_appstate[job_status_info] - roles = [Role(name="ray", num_replicas=1, image="")] - - # get ip_address and put it in hostname - - roles_statuses = [ - RoleStatus( - role="ray", - replicas=[ - ReplicaStatus( - id=0, - role="ray", - hostname=NONE, - state=state, - ) - ], - ) - ] - return DescribeAppResponse( - app_id=app_id, - state=state, - msg=job_status_info, - roles_statuses=roles_statuses, - roles=roles, - ) - - def log_iter( - self, - app_id: str, - role_name: Optional[str] = None, - k: int = 0, - regex: Optional[str] = None, - since: Optional[datetime] = None, - until: Optional[datetime] = None, - should_tail: bool = False, - streams: Optional[Stream] = None, - ) -> Iterable[str]: - # TODO: support tailing, streams etc.. - addr, app_id = self._parse_app_id(app_id) - client: JobSubmissionClient = self._get_ray_client(job_submission_netloc=addr) - logs: str = client.get_job_logs(app_id) - iterator = split_lines(logs) - if regex: - return filter_regex(regex, iterator) - return iterator - - def list(self) -> List[ListAppResponse]: - client = self._get_ray_client() - jobs = client.list_jobs() - netloc = urllib3.util.parse_url(client.get_address()).netloc - return [ - ListAppResponse( - app_id=f"{netloc}-{details.submission_id}", - state=_ray_status_to_torchx_appstate[details.status], - ) - for details in jobs - ] - - -def create_scheduler( - session_name: str, ray_client: Optional[JobSubmissionClient] = None, **kwargs: Any -) -> "RayScheduler": - return RayScheduler(session_name=session_name, ray_client=ray_client) diff --git a/torchx/schedulers/test/.gitignore b/torchx/schedulers/test/.gitignore deleted file mode 100644 index 87b49fbca..000000000 --- a/torchx/schedulers/test/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -actors.json -ray_common.py -ray_driver.py -staging diff --git a/torchx/schedulers/test/ray_scheduler_test.py b/torchx/schedulers/test/ray_scheduler_test.py deleted file mode 100644 index 4f847025c..000000000 --- a/torchx/schedulers/test/ray_scheduler_test.py +++ /dev/null @@ -1,697 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -import os -import tempfile -from contextlib import contextmanager -from dataclasses import dataclass -from shutil import copy2 -from typing import Any, cast, Iterable, Iterator, List, Optional, Type -from unittest import TestCase -from unittest.mock import MagicMock, patch - -import ray -from ray.cluster_utils import Cluster -from ray.dashboard.modules.job.sdk import JobSubmissionClient -from ray.util.placement_group import remove_placement_group - -from torchx.schedulers import get_scheduler_factories -from torchx.schedulers.api import DescribeAppResponse, ListAppResponse -from torchx.schedulers.ray import ray_driver -from torchx.schedulers.ray.ray_common import RayActor -from torchx.schedulers.ray_scheduler import ( - _logger, - RayJob, - RayOpts, - RayScheduler, - serialize, -) -from torchx.specs import AppDef, AppDryRunInfo, Resource, Role, runopts - - -class RaySchedulerRegistryTest(TestCase): - def test_get_schedulers_returns_ray_scheduler(self) -> None: - schedulers = get_scheduler_factories() - - self.assertIn("ray", schedulers) - - scheduler = schedulers["ray"]("test_session") - - self.assertIsInstance(scheduler, RayScheduler) - - ray_scheduler = cast(RayScheduler, scheduler) - - self.assertEqual(ray_scheduler.backend, "ray") - self.assertEqual(ray_scheduler.session_name, "test_session") - - -class RaySchedulerTest(TestCase): - def setUp(self) -> None: - self._scripts = ["dummy1.py", "dummy2.py"] - - self.tempdir = tempfile.TemporaryDirectory() - - self._app_def = AppDef( - name="dummy_app", - roles=[ - Role( - name="dummy_role1", - image=self.tempdir.name, - entrypoint="dummy_entrypoint1", - args=["arg1", self._scripts[0], "arg2"], - num_replicas=3, - env={"dummy_env": "dummy_value"}, - resource=Resource(cpu=2, gpu=3, memMB=0), - ), - Role( - name="dummy_role2", - image=self.tempdir.name, - entrypoint="dummy_entrypoint2", - args=["arg3", "arg4", self._scripts[1]], - ), - ], - ) - - self._run_cfg = RayOpts( - { - "cluster_config_file": "dummy_file", - "cluster_name": "dummy_name", - "working_dir": None, - "requirements": None, - } - ) - - # mock validation step so that instantiation doesn't fail due to inability to reach dashboard - JobSubmissionClient._check_connection_and_version = MagicMock() - - self._scheduler = RayScheduler("test_session") - - self._isfile_patch = patch("torchx.schedulers.ray_scheduler.os.path.isfile") - - self._mock_isfile = self._isfile_patch.start() - self._mock_isfile.return_value = True - - def tearDown(self) -> None: - self.tempdir.cleanup() - self._isfile_patch.stop() - - def test_init_sets_session_and_backend_name(self) -> None: - self.assertEqual(self._scheduler.backend, "ray") - self.assertEqual(self._scheduler.session_name, "test_session") - - def test_run_opts_returns_expected_options(self) -> None: - opts: runopts = self._scheduler.run_opts() - - @dataclass - class Option: - name: str - opt_type: Type - is_required: bool = False - default: Any = None - - def assert_option(expected_opt: Option) -> None: - opt = opts.get(expected_opt.name) - - self.assertIsNotNone(opt) - - self.assertEqual(opt.opt_type, expected_opt.opt_type) - self.assertEqual(opt.is_required, expected_opt.is_required) - - if expected_opt.default is None: - self.assertIsNone(opt.default) - else: - self.assertEqual(opt.default, expected_opt.default) - - expected_opts = [ - Option("cluster_config_file", str, is_required=False), - Option("cluster_name", str), - Option("dashboard_address", str, default="127.0.0.1:8265"), - Option("requirements", str, is_required=False), - ] - - self.assertEqual(len(opts), len(expected_opts)) - - for expected_opt in expected_opts: - assert_option(expected_opt) - - def test_validate_does_not_raise_error_and_does_not_log_warning(self) -> None: - with self.assertLogs(_logger, "WARNING") as cm: - self._scheduler._validate(self._app_def, scheduler="ray", cfg=self._run_cfg) - - _logger.warning("dummy log") - - self.assertEqual(len(cm.records), 1) - - def test_validate_raises_error_if_backend_name_is_not_ray(self) -> None: - with self.assertRaisesRegex( - ValueError, - r"^An unknown scheduler backend 'dummy' has been passed to the Ray scheduler.$", - ): - self._scheduler._validate( - self._app_def, scheduler="dummy", cfg=self._run_cfg - ) - - @contextmanager - def _assert_log_message(self, level: str, msg: str) -> Iterator[None]: - with self.assertLogs(_logger) as cm: - yield - - self.assertEqual(len(cm.records), 1) - - log_record = cm.records[0] - - self.assertEqual(log_record.levelname, level) - self.assertEqual(log_record.message, msg) - - def test_validate_warns_when_app_def_contains_metadata(self) -> None: - self._app_def.metadata["dummy_key"] = "dummy_value" - - with self._assert_log_message( - "WARNING", "The Ray scheduler does not use metadata information." - ): - self._scheduler._validate(self._app_def, scheduler="ray", cfg=self._run_cfg) - - def test_validate_warns_when_role_contains_resource_capability(self) -> None: - self._app_def.roles[1].resource.capabilities["dummy_cap1"] = 1 - self._app_def.roles[1].resource.capabilities["dummy_cap2"] = 2 - - with self._assert_log_message( - "WARNING", - "The Ray scheduler does not support custom resource capabilities.", - ): - self._scheduler._validate(self._app_def, scheduler="ray", cfg=self._run_cfg) - - def test_validate_warns_when_role_contains_port_map(self) -> None: - self._app_def.roles[1].port_map["dummy_map1"] = 1 - self._app_def.roles[1].port_map["dummy_map2"] = 2 - - with self._assert_log_message( - "WARNING", "The Ray scheduler does not support port mapping." - ): - self._scheduler._validate(self._app_def, scheduler="ray", cfg=self._run_cfg) - - def test_submit_dryrun_raises_error_if_cluster_config_file_is_not_str( - self, - ) -> None: - # pyre-fixme: Expects string type - self._run_cfg["cluster_config_file"] = 1 - - with self.assertRaisesRegex( - ValueError, - r"^The cluster configuration file must be a YAML file.$", - ): - self._scheduler._submit_dryrun(self._app_def, self._run_cfg) - - def test_submit_dryrun_raises_error_if_cluster_config_file_is_not_found( - self, - ) -> None: - self._mock_isfile.return_value = False - - with self.assertRaisesRegex( - ValueError, - r"^The cluster configuration file must be a YAML file.$", - ): - self._scheduler._submit_dryrun(self._app_def, self._run_cfg) - - # pyre-ignore[2]: Parameter `value` must have a type other than `Any` - def _assert_config_value(self, name: str, value: Any, type_name: str) -> None: - # pyre-fixme: TypedDict indexes by string literal - self._run_cfg[name] = value - - with self.assertRaisesRegex( - TypeError, - rf"^The configuration value '{name}' must be of type {type_name}.$", - ): - self._scheduler._submit_dryrun(self._app_def, self._run_cfg) - - def _assert_submit_dryrun_constructs_job_definition(self) -> None: - run_info = self._scheduler._submit_dryrun(self._app_def, self._run_cfg) - - job = run_info.request - - self.assertTrue(job.app_id.startswith(self._app_def.name)) - self.assertGreater(len(job.app_id), len(self._app_def.name)) - - self.assertEqual( - job.cluster_config_file, self._run_cfg.get("cluster_config_file") - ) - self.assertEqual(job.cluster_name, self._run_cfg.get("cluster_name")) - - actor_roles = [] - for role in self._app_def.roles: - actor_roles += [role] * role.num_replicas - - self.assertEqual(len(job.actors), len(actor_roles)) - - for actor, role in zip(job.actors, actor_roles): - self.assertEqual(actor.name, role.name) - self.assertEqual(actor.command, [role.entrypoint] + role.args) - self.assertEqual(actor.env, role.env) - self.assertEqual(actor.num_cpus, max(1, role.resource.cpu)) - self.assertEqual(actor.num_gpus, max(0, role.resource.gpu)) - - def test_submit_dryrun_constructs_job_definition(self) -> None: - self._assert_submit_dryrun_constructs_job_definition() - - self._run_cfg["cluster_name"] = None - self._run_cfg["working_dir"] = None - self._run_cfg["requirements"] = None - - self._assert_submit_dryrun_constructs_job_definition() - - def test_submit_dryrun_constructs_actor_command(self) -> None: - run_info = self._scheduler._submit_dryrun(self._app_def, self._run_cfg) - - job = run_info.request - - self.assertEqual( - job.actors[0].command, - ["dummy_entrypoint1", "arg1", "dummy1.py", "arg2"], - ) - - def test_no_dir(self) -> None: - app = AppDef( - name="dummy_app", - roles=[ - Role( - name="dummy_role1", - image="invalid_path", - ), - ], - ) - with self.assertRaisesRegex( - RuntimeError, "Role image must be a valid directory, got: invalid_path" - ): - self._scheduler._submit_dryrun(app, cfg={}) - - def test_requirements(self) -> None: - with tempfile.TemporaryDirectory() as path: - reqs = os.path.join(path, "requirements.txt") - with open(reqs, "w") as f: - f.write("asdf") - - app = AppDef( - name="app", - roles=[ - Role( - name="role", - image=path, - ), - ], - ) - req = self._scheduler._submit_dryrun(app, cfg={}) - job = req.request - self.assertEqual(job.requirements, reqs) - - def test_parse_app_id(self) -> None: - test_addr_appid = [ - ( - "0.0.0.0:1234-app_id", - "0.0.0.0:1234", - "app_id", - ), # (full address, address:port, app_id) - ("addr-of-cluster:1234-app-id", "addr-of-cluster:1234", "app-id"), - ("www.test.com:1234-app:id", "www.test.com:1234", "app:id"), - ("foo", "foo", ""), - ("foo-bar-bar", "foo", "bar-bar"), - ] - for test_example, addr, app_id in test_addr_appid: - parsed_addr, parsed_appid = self._scheduler._parse_app_id(test_example) - self.assertEqual(parsed_addr, addr) - self.assertEqual(parsed_appid, app_id) - - def test_list_throws_without_address(self) -> None: - if "RAY_ADDRESS" in os.environ: - del os.environ["RAY_ADDRESS"] - with self.assertRaisesRegex(Exception, "RAY_ADDRESS env variable"): - self._scheduler.list() - - def test_list_doesnt_throw_with_client(self) -> None: - ray_client = JobSubmissionClient(address="https://test.com") - ray_client.list_jobs = MagicMock(return_value=[]) - _scheduler_with_client = RayScheduler("client_session", ray_client) - _scheduler_with_client.list() # testing for success (should not throw exception) - - def test_min_replicas(self) -> None: - app = AppDef( - name="app", - roles=[ - Role( - name="role", - image="/tmp/", - num_replicas=2, - ), - ], - ) - req = self._scheduler._submit_dryrun(app, cfg={}) - job = req.request - self.assertEqual(job.actors[0].min_replicas, None) - - app.roles[0].min_replicas = 1 - req = self._scheduler._submit_dryrun(app, cfg={}) - job = req.request - self.assertEqual(job.actors[0].min_replicas, 1) - - app.roles.append( - Role( - name="role", - image="/tmp/", - num_replicas=2, - min_replicas=1, - ) - ) - with self.assertRaisesRegex( - ValueError, "min_replicas is only supported with single role jobs" - ): - self._scheduler._submit_dryrun(app, cfg={}) - - def test_nonmatching_address(self) -> None: - ray_client = JobSubmissionClient(address="https://test.address.com") - _scheduler_with_client = RayScheduler("client_session", ray_client) - app = AppDef( - name="app", - roles=[ - Role(name="role", image="."), - ], - ) - with self.assertRaisesRegex( - ValueError, "client netloc .* does not match job netloc .*" - ): - _scheduler_with_client.submit(app=app, cfg={}) - - def _assertDictContainsSubset( - self, - expected: dict[str, Any], - actual: dict[str, Any], - msg: Optional[str] = None, - ) -> None: - # NB: implement unittest.TestCase.assertDictContainsSubsetNew() since it was removed in python-3.11 - for key, value in expected.items(): - self.assertIn(key, actual, msg) - self.assertEqual(actual[key], value, msg) - - def test_client_with_headers(self) -> None: - # This tests only one option for the client. Different versions may have more options available. - headers = {"Authorization": "Bearer: token"} - ray_client = JobSubmissionClient( - address="https://test.com", headers=headers, verify=False - ) - _scheduler_with_client = RayScheduler("client_session", ray_client) - scheduler_client = _scheduler_with_client._get_ray_client() - self._assertDictContainsSubset(scheduler_client._headers, headers) - - -class RayClusterSetup: - _instance = None # pyre-ignore - _cluster = None # pyre-ignore - - def __new__(cls): # pyre-ignore - if cls._instance is None: - cls._instance = super(RayClusterSetup, cls).__new__(cls) - ray.shutdown() - cls._cluster = Cluster( - initialize_head=True, - head_node_args={ - "num_cpus": 1, - }, - ) - cls._cluster.connect() # connect before any node changes - cls._cluster.add_node() # total of 2 cpus available - cls.reference_count: int = 4 - return cls._instance - - @property - def workers(self) -> List[object]: - return list(self._cluster.worker_nodes) - - def add_node(self, num_cpus: int = 1) -> None: - # add 1 node with 2 cpus to the cluster - self._cluster.add_node(num_cpus=num_cpus) - - def remove_node(self) -> None: - # randomly remove 1 node from the cluster - self._cluster.remove_node(self.workers[0]) - - def decrement_reference(self) -> None: - self.reference_count -= 1 - if self.reference_count == 0: - self.teardown_ray_cluster() - - def teardown_ray_cluster(self) -> None: - ray.shutdown() - self._cluster.shutdown() - del os.environ["RAY_ADDRESS"] - - -class RayDriverTest(TestCase): - def test_actors_serialize(self) -> None: - actor1 = RayActor( - name="test_actor_1", - command=["python", "1", "2"], - env={"fake": "1"}, - min_replicas=2, - ) - actor2 = RayActor( - name="test_actor_2", - command=["python", "3", "4"], - env={"fake": "2"}, - min_replicas=2, - ) - actors = [actor1, actor2] - current_dir = os.path.dirname(os.path.realpath(__file__)) - serialize(actors, current_dir) - - loaded_actor = ray_driver.load_actor_json( - os.path.join(current_dir, "actors.json") - ) - self.assertEqual(loaded_actor, actors) - - def test_unknown_result(self) -> None: - actor1 = RayActor( - name="test_actor_1", - command=[ - "python", - "-c" 'import time; time.sleep(1); print("test_actor_1")', - ], - env={"fake": "1"}, - ) - actors = [ - actor1, - ] - driver = ray_driver.RayDriver(actors) - ray_cluster_setup = RayClusterSetup() - self.assertEqual(driver.min_replicas, 1) - self.assertEqual(driver.max_replicas, 1) - - @ray.remote - def f() -> int: - return 1 - - driver.active_tasks = [f.remote()] - with self.assertRaises(RuntimeError): - driver._step() - - ray_cluster_setup.decrement_reference() - - def test_ray_driver_gang(self) -> None: - """Test launching a gang scheduling job""" - actor1 = RayActor( - name="test_actor_1", - command=[ - "python", - "-c" 'import time; time.sleep(1); print("test_actor_1")', - ], - env={"fake": "1"}, - min_replicas=2, - ) - actor2 = RayActor( - name="test_actor_2", - command=[ - "python", - "-c" 'import time; time.sleep(1); print("test_actor_2")', - ], - env={"fake": "2"}, - min_replicas=2, - ) - actors = [actor1, actor2] - - driver = ray_driver.RayDriver(actors) - ray_cluster_setup = RayClusterSetup() - - # test init_placement_groups - driver.init_placement_groups() - self.assertEqual(len(driver.placement_groups), 1) - self.assertEqual(len(driver.active_tasks), 0) - - driver.place_command_actors() - self.assertEqual(len(driver.active_tasks), 2) - self.assertEqual(len(driver.actor_info_of_id), 2) - - driver.run() # execute commands on command actors - self.assertEqual( - len(driver.active_tasks), 0 - ) # wait util all active tasks finishes - self.assertEqual(driver.command_actors_count, 0) - self.assertIsNotNone(driver.rank_0_address) - self.assertIsNotNone(driver.rank_0_port) - - # ray.available_resources()['CPU'] == 0 - for pg in driver.placement_groups: - # clear used placement groups - remove_placement_group(pg) - # ray.available_resources()['CPU'] == 2 - - ray_cluster_setup.decrement_reference() - - def test_ray_driver_elasticity(self) -> None: - """Test launching an elasticity job""" - actor1 = RayActor( - name="test_actor_1", - command=[ - "python", - "-c" 'import time; time.sleep(1); print("test_actor_elasticity_1")', - ], - env={"fake": "1"}, - min_replicas=1, - ) - actor2 = RayActor( - name="test_actor_2", - command=[ - "python", - "-c" 'import time; time.sleep(1); print("test_actor_elasticity_2")', - ], - env={"fake": "2"}, - min_replicas=1, - ) - actors = [actor1, actor2] - - driver = ray_driver.RayDriver(actors) - ray_cluster_setup = RayClusterSetup() - ray_cluster_setup.remove_node() # Remove 1 cpu, should have 1 cpu in the cluster - - # 1. test init_placement_groups - driver.init_placement_groups() - self.assertEqual(len(driver.placement_groups), 2) # 2 placement groups created - self.assertEqual(len(driver.active_tasks), 0) - created, pending = ray.wait( - [driver.placement_groups[0].ready(), driver.placement_groups[1].ready()] - ) - self.assertEqual(len(created), 1) - self.assertEqual(len(pending), 1) - - # 2. test place_command_actors - driver.place_command_actors() - self.assertEqual(len(driver.active_tasks), 2) # 2 command actors - self.assertEqual(len(driver.actor_info_of_id), 2) - self.assertEqual(driver.command_actors_count, 0) - - # 3-1 - teriminal = driver._step() # actor 1 scheduled, execute the script - self.assertEqual(teriminal, False) - self.assertEqual(len(driver.active_tasks), 2) # actor1 should be finished - self.assertEqual(driver.command_actors_count, 1) - self.assertIsNotNone(driver.rank_0_address) - self.assertIsNotNone(driver.rank_0_port) - - # 3-2 - terminal = ( - driver._step() - ) # actor 1 finished, actor 2 has been scheduled yet, usually, the driver stops here - self.assertEqual(terminal, True) - self.assertEqual(driver.command_actors_count, 0) - self.assertEqual(len(driver.active_tasks), 1) # actor schedule task - self.assertEqual(driver.terminating, True) - - ray_cluster_setup.add_node() # add 1 cpu to the cluster - # 3-3 - teriminal = ( - driver._step() - ) # pg 2 becomes available, but actor 2 shouldn't be executed - self.assertEqual(teriminal, False) - self.assertEqual(len(driver.active_tasks), 0) # actor1 should be finished - self.assertEqual(driver.command_actors_count, 0) - - for pg in driver.placement_groups: - # clear used placement groups - remove_placement_group(pg) - - ray_cluster_setup.decrement_reference() - - -class RayIntegrationTest(TestCase): - def test_ray_cluster(self) -> None: - ray_cluster_setup = RayClusterSetup() - ray_scheduler = self.setup_ray_cluster() - self.assertTrue(ray.is_initialized()) - - job_id = self.schedule_ray_job(ray_scheduler) - self.assertIsNotNone(job_id) - - ray_scheduler.wait_until_finish(job_id, 100) - - logs = self.check_logs(ray_scheduler=ray_scheduler, app_id=job_id) - print(logs) - self.assertIsNotNone(logs) - - status = self.describe(ray_scheduler, job_id) - self.assertIsNotNone(status) - - apps = self.list(ray_scheduler) - self.assertEqual(len(apps), 2) - self.assertEqual(apps[0].app_id, job_id) - - ray_cluster_setup.decrement_reference() - - def setup_ray_cluster(self) -> RayScheduler: - ray_scheduler = RayScheduler(session_name="test") - return ray_scheduler - - def schedule_ray_job(self, ray_scheduler: RayScheduler, app_id: str = "123") -> str: - current_dir = os.path.dirname(os.path.realpath(__file__)) - # Ray packaging honours .gitignore file -> create staging directory just for packaging: - # - job will use it as a cwd and copy ray_driver.py - # - test will copy training script to the same destination - staging_dir = os.path.join(current_dir, "staging") - os.makedirs(staging_dir, exist_ok=True) - copy2(os.path.join(current_dir, "train.py"), staging_dir) - actors = [ - RayActor( - name="ddp", - num_cpus=1, - command=[os.path.join(staging_dir, "train.py")], - min_replicas=2, - ), - RayActor( - name="ddp", - num_cpus=1, - command=[os.path.join(staging_dir, "train.py")], - min_replicas=2, - ), - ] - - ray_job = RayJob( - app_id=app_id, - dashboard_address="127.0.0.1:8265", - actors=actors, - working_dir=staging_dir, - ) - app_info = AppDryRunInfo(ray_job, repr) - job_id = ray_scheduler.schedule(app_info) - return job_id - - def describe( - self, ray_scheduler: RayScheduler, app_id: str = "123" - ) -> Optional[DescribeAppResponse]: - return ray_scheduler.describe(app_id) - - def check_logs( - self, ray_scheduler: RayScheduler, app_id: str = "123" - ) -> Iterable[str]: - return ray_scheduler.log_iter(app_id=app_id) - - def list(self, ray_scheduler: RayScheduler) -> List[ListAppResponse]: - os.environ["RAY_ADDRESS"] = "http://127.0.0.1:8265" - return ray_scheduler.list()