Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/slurm-local-integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ on:
- main
pull_request:


env:
SLURM_VERSION: 21.08.6
# slurm tag should be one of https://github.com/SchedMD/slurm/tags
SLURM_TAG: slurm-23-11-11-1
SLURM_VERSION: 23.11.11

jobs:
slurm:
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,7 @@ wordlist.dic
pipeline.yaml

/codecov
.python-version
enhanced_distributed_pipeline.yaml
task_configs_pipeline.yaml
local_outputs
3 changes: 2 additions & 1 deletion .pyre_configuration
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
"stubs"
],
"strict": true,
"version": "0.0.101732536891"
"enable_strict_any_check": true,
"version": "0.0.101749035478"
}
17 changes: 9 additions & 8 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ google-cloud-logging==3.10.0
google-cloud-runtimeconfig==0.34.0
hydra-core
ipython
kfp==1.8.22
# pin protobuf to the version that is required by kfp
protobuf==3.20.3
kfp>=2.8.0
kfp-kubernetes>=1.4.0 # For Kubernetes-specific features in KFP v2
# kfp v2 is compatible with protobuf 4+
protobuf>=4.21.0
mlflow-skinny
moto~=5.0.8
pyre-extensions
Expand All @@ -30,6 +31,7 @@ torchmetrics==1.6.3
torchserve>=0.10.0
torchtext==0.18.0
torchvision==0.22.0
typing-extensions
ts==0.5.1
ray[default]
wheel
Expand All @@ -40,8 +42,7 @@ lintrunner-adapters


# reduce backtracking
grpcio==1.62.1
grpcio-status==1.48.1
googleapis-common-protos==1.63.0
google-api-core==2.18.0
protobuf==3.20.3 # kfp==1.8.22 needs protobuf < 4
grpcio>=1.62.1
grpcio-status>=1.48.1
googleapis-common-protos>=1.63.0
google-api-core>=2.18.0
2 changes: 0 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
pyre-extensions
docstring-parser>=0.8.1
importlib-metadata
pyyaml
docker
filelock
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ def get_nightly_version():
"google-cloud-logging>=3.0.0",
"google-cloud-runtimeconfig>=0.33.2",
],
"kfp": ["kfp==1.6.2"],
# KFP 2.0+ is not supported yet, see https://github.com/pytorch/torchx/issues/123
"kfp": [
"kfp>=2.8.0"
], # optional: required for Kubeflow Pipelines integration
"kubernetes": ["kubernetes>=11"],
"ray": ["ray>=1.12.1"],
"dev": dev_reqs,
Expand Down
6 changes: 3 additions & 3 deletions torchx/components/dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ def spmd(
j: {nnodes}x{nproc_per_node}. For GPU hosts omitting nproc_per_node will infer it from the GPU count on the host
env: environment variables to be passed to the run (e.g. ENV1=v1,ENV2=v2,ENV3=v3)
max_retries: the number of scheduler retries allowed
rdzv_port: the port on rank0's host to use for hosting the c10d store used for rendezvous.
Only takes effect when running multi-node. When running single node, this parameter
is ignored and a random free port is chosen.
mounts: (for docker based runs only) mounts to mount into the worker environment/container
(ex. type=<bind/volume>,src=/host,dst=/job[,readonly]).
debug: whether to run with preset debug flags enabled
Expand Down Expand Up @@ -174,6 +171,7 @@ def ddp(
max_retries: int = 0,
rdzv_port: int = 29500,
rdzv_backend: str = "c10d",
rdzv_conf: Optional[str] = None,
mounts: Optional[List[str]] = None,
debug: bool = False,
tee: int = 3,
Expand Down Expand Up @@ -208,6 +206,7 @@ def ddp(
Only takes effect when running multi-node. When running single node, this parameter
is ignored and a random free port is chosen.
rdzv_backend: the rendezvous backend to use. Only takes effect when running multi-node.
rdzv_conf: the additional rendezvous configuration to use (ex. join_timeout=600,close_timeout=600,timeout=600).
mounts: mounts to mount into the worker environment/container (ex. type=<bind/volume>,src=/host,dst=/job[,readonly]).
See scheduler documentation for more info.
debug: whether to run with preset debug flags enabled
Expand Down Expand Up @@ -258,6 +257,7 @@ def ddp(
"torchrun",
"--rdzv_backend",
rdzv_backend,
*(["--rdzv_conf", rdzv_conf] if rdzv_conf is not None else []),
"--rdzv_endpoint",
rdzv_endpoint,
"--rdzv_id",
Expand Down
5 changes: 2 additions & 3 deletions torchx/components/structured_arg.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
from pathlib import Path
from typing import Optional

from pyre_extensions import none_throws

from torchx import specs


Expand Down Expand Up @@ -148,7 +146,8 @@ def parse_from(
if m: # use the last module name
run_name = m.rpartition(".")[2]
else: # use script name w/ no extension
run_name = Path(none_throws(script)).stem
assert script, "`script` can't be `None` here due checks above"
run_name = Path(script).stem
return StructuredNameArgument(
experiment_name or default_experiment_name, run_name
)
Expand Down
4 changes: 3 additions & 1 deletion torchx/components/test/dist_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ def test_ddp_debug(self) -> None:
self.assertEqual(env[k], v)

def test_ddp_rdzv_backend_static(self) -> None:
app = ddp(script="foo.py", rdzv_backend="static")
rdzv_conf = "join_timeout=600,close_timeout=600,timeout=600"
app = ddp(script="foo.py", rdzv_backend="static", rdzv_conf=rdzv_conf)
cmd = app.roles[0].args[1]
self.assertTrue(f"--rdzv_conf {rdzv_conf}" in cmd)
self.assertTrue("--rdzv_backend static" in cmd)
self.assertTrue("--node_rank" in cmd)

Expand Down
58 changes: 32 additions & 26 deletions torchx/examples/pipelines/kfp/advanced_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
import sys
from typing import Dict

import kfp
import torchx

from kfp import compiler, dsl
from torchx import specs
from torchx.components.dist import ddp as dist_ddp
from torchx.components.serve import torchserve
from torchx.components.utils import copy as utils_copy, python as utils_python
from torchx.pipelines.kfp.adapter import container_from_app


parser = argparse.ArgumentParser(description="example kfp pipeline")

# %%
Expand Down Expand Up @@ -238,48 +238,54 @@
# cluster.
#
# The KFP adapter currently doesn't track the input and outputs so the
# containers need to have their dependencies specified via `.after()`.
# containers need to have their dependencies specified.
#
# We call `.set_tty()` to make the logs from the components more responsive for
# example purposes.
# We no longer need to call `.set_tty()` as that was a v1 feature.


@dsl.pipeline(
name="TorchX Advanced Pipeline",
description="Advanced KFP pipeline with TorchX components",
)
def pipeline() -> None:
# container_from_app creates a KFP container from the TorchX app
# container_from_app creates a KFP v2 task from the TorchX app
# definition.
copy = container_from_app(copy_app)
copy.container.set_tty()
copy_task = container_from_app(copy_app)
copy_task.set_display_name("Download Data")

datapreproc = container_from_app(datapreproc_app)
datapreproc.container.set_tty()
datapreproc.after(copy)
datapreproc_task = container_from_app(datapreproc_app)
datapreproc_task.set_display_name("Preprocess Data")
# In KFP v2, dependencies are automatically handled based on data flow
# If you need explicit dependencies, you need to pass outputs as inputs
datapreproc_task.after(copy_task)

# For the trainer we want to log that UI metadata so you can access
# tensorboard from the UI.
trainer = container_from_app(trainer_app, ui_metadata=ui_metadata)
trainer.container.set_tty()
trainer.after(datapreproc)
trainer_task = container_from_app(trainer_app, ui_metadata=ui_metadata)
trainer_task.set_display_name("Train Model")
trainer_task.after(datapreproc_task)

if False:
serve = container_from_app(serve_app)
serve.container.set_tty()
serve.after(trainer)
serve_task = container_from_app(serve_app)
serve_task.set_display_name("Serve Model")
serve_task.after(trainer_task)

if False:
# Serve and interpret only require the trained model so we can run them
# in parallel to each other.
interpret = container_from_app(interpret_app)
interpret.container.set_tty()
interpret.after(trainer)
interpret_task = container_from_app(interpret_app)
interpret_task.set_display_name("Interpret Model")
interpret_task.after(trainer_task)


kfp.compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="pipeline.yaml",
)
if __name__ == "__main__":
compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="pipeline.yaml",
)

with open("pipeline.yaml", "rt") as f:
print(f.read())
with open("pipeline.yaml", "rt") as f:
print(f.read())

# %%
# Once this has all run you should have a pipeline file (typically
Expand Down
40 changes: 26 additions & 14 deletions torchx/examples/pipelines/kfp/dist_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@
======================================

This is an example KFP pipeline that uses resource_from_app to launch a
distributed operator using the kubernetes/volcano job scheduler. This only works
distributed job using the kubernetes/volcano job scheduler. This only works
in Kubernetes KFP clusters with https://volcano.sh/en/docs/ installed on them.
"""

import kfp
from kfp import compiler, dsl
from torchx import specs
from torchx.pipelines.kfp.adapter import resource_from_app


@dsl.pipeline(
name="distributed-pipeline",
description="A distributed pipeline using Volcano job scheduler",
)
def pipeline() -> None:
# First we define our AppDef for the component, we set
# First we define our AppDef for the component
echo_app = specs.AppDef(
name="test-dist",
roles=[
Expand All @@ -36,31 +40,39 @@ def pipeline() -> None:
],
)

# To convert the TorchX AppDef into a KFP container we use
# the resource_from_app adapter. This takes generates a KFP Kubernetes
# resource operator definition from the TorchX app def and instantiates it.
echo_container: kfp.dsl.BaseOp = resource_from_app(echo_app, queue="default")
# To convert the TorchX AppDef into a KFP v2 task that creates
# a Volcano job, we use the resource_from_app adapter.
# This generates a task that uses kubectl to create the Volcano job.
echo_task = resource_from_app(echo_app, queue="default")

# Set display name for better visualization
echo_task.set_display_name("Distributed Echo Job")


# %%
# To generate the pipeline definition file we need to call into the KFP compiler
# with our pipeline function.

kfp.compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="pipeline.yaml",
)
if __name__ == "__main__":
compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="pipeline.yaml",
)

with open("pipeline.yaml", "rt") as f:
print(f.read())
with open("pipeline.yaml", "rt") as f:
print(f.read())

# %%
# Once this has all run you should have a pipeline file (typically
# pipeline.yaml) that you can upload to your KFP cluster via the UI or
# a kfp.Client.
#
# Note: In KFP v2, for more advanced Kubernetes resource manipulation,
# consider using the kfp-kubernetes extension library which provides
# better integration with Kubernetes resources.
#
# See the
# `KFP SDK Examples <https://www.kubeflow.org/docs/components/pipelines/legacy-v1/tutorials/sdk-examples/#examples>`_
# `KFP SDK Examples <https://www.kubeflow.org/docs/components/pipelines/user-guides/core-functions/create-a-pipeline-run/>`_
# for more info on launching KFP pipelines.

# %%
Expand Down
Loading
Loading