Skip to content

Commit 6ee822c

Browse files
feat: add by_dlrover_run_cmd() shortcut for Ray backend (#1687)
* feat: add by_dlrover_run_cmd() shortcut for Ray backend * feat: add by_dlrover_run_cmd() shortcut for Ray backend * feat: add by_dlrover_run_cmd() shortcut for Ray backend * feat: add by_dlrover_run_cmd() shortcut for Ray backend * feat: add by_dlrover_run_cmd() shortcut for Ray backend * feat: add by_dlrover_run_cmd() shortcut for Ray backend * feat: add by_dlrover_run_cmd() shortcut for Ray backend * feat: add by_dlrover_run_cmd() shortcut for Ray backend --------- Co-authored-by: Tianyi Chen <chentianyi.cty@antfin.com>
1 parent 1f3983e commit 6ee822c

File tree

4 files changed

+176
-2
lines changed

4 files changed

+176
-2
lines changed

dlrover/python/unified/api/builder/base.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727

2828
from pydantic import Field, model_validator
2929

30+
import shlex
31+
32+
from torch.distributed.run import get_args_parser
33+
3034
from dlrover.python.unified.common.config import (
3135
DLConfig,
3236
JobConfig,
@@ -329,6 +333,33 @@ def _build_role(self) -> Dict[str, WorkloadDesc]:
329333
}
330334

331335

336+
def parse_run_cmd_argument(launcher, args):
337+
if launcher not in ["dlrover-run", "torchrun"]:
338+
raise ValueError(
339+
f"Only 'dlrover-run' and 'torchrun' command is supported, got '{launcher}'"
340+
)
341+
342+
if launcher == "torchrun":
343+
parser = get_args_parser()
344+
args = parser.parse_args(args)
345+
else:
346+
parser = get_args_parser()
347+
348+
# deprecated arguments
349+
parser.add_argument(
350+
"--node_check",
351+
"--node-check",
352+
"--network-check",
353+
"--network_check",
354+
action="store_true",
355+
help="Whether to check node before starting training process.",
356+
)
357+
parser.allow_abbrev = False
358+
args = parser.parse_args(args)
359+
360+
return args
361+
362+
332363
class DLJobBuilder(object):
333364
def __init__(self):
334365
# Dummy object to hold parameters, use default if not assigned.
@@ -555,3 +586,46 @@ def with_collocation_all(self, *exclude_roles):
555586
roles.add(role)
556587
self._collocations.append(roles)
557588
return self
589+
590+
def by_dlrover_run_cmd(self, cmd: str):
591+
"""
592+
Automatically build DLJob from dlrover run command.
593+
Args:
594+
cmd: The dlrover run command string to build the job.
595+
e.g.
596+
"dlrover-run --nnodes=2 --nproc_per_node=2 ./dlrover/python/unified/tests/integration_test/dummy_run.py"
597+
598+
cmd contains the parameters:
599+
--nnodes: number of nodes
600+
--nproc_per_node: number of processes per node
601+
--node_check: Whether to check node before starting training process.
602+
entrypoint: the training script path with args
603+
"""
604+
parts = shlex.split(cmd.strip())
605+
launcher = parts[0] # dlrover-run or torchrun
606+
args = parts[1:]
607+
608+
args = parse_run_cmd_argument(launcher, args)
609+
610+
if launcher == "dlrover-run" and not args.node_check:
611+
self = self.skip_node_check()
612+
613+
node_num = int(args.nnodes)
614+
device_per_node = int(args.nproc_per_node)
615+
nnodes = int(args.nnodes)
616+
nproc_per_node = int(args.nproc_per_node)
617+
training_script = args.training_script
618+
for arg in args.training_script_args:
619+
training_script += " " + arg
620+
621+
return (
622+
self.node_num(node_num)
623+
.device_per_node(device_per_node)
624+
.device_type("CPU")
625+
.config({"c1": "v1"})
626+
.global_env({"eα": "ve", "DLROVER_LOG_LEVEL": "DEBUG"})
627+
.train(training_script)
628+
.nnodes(nnodes)
629+
.nproc_per_node(nproc_per_node)
630+
.end()
631+
)

dlrover/python/unified/tests/api/test_builder.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
RLRoleType,
3030
)
3131
from dlrover.python.unified.tests.base import BaseTest
32+
import os
3233

3334

3435
class ApiTest(BaseTest):
@@ -118,6 +119,58 @@ def test_basic(self):
118119

119120
self.assertEqual(len(rl_job.workloads), 6)
120121

122+
def test_by_dlrover_run_cmd(self):
123+
root_dir = os.path.dirname(
124+
os.path.dirname(
125+
os.path.dirname(
126+
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
127+
)
128+
)
129+
)
130+
cmd = f"dlrover-run --nnodes=2 --nproc_per_node=2 --node_check {root_dir}/dlrover/python/unified/tests/integration_test/dummy_run.py --test 0"
131+
132+
dl_job = DLJobBuilder().by_dlrover_run_cmd(cmd).build()
133+
134+
for workload in dl_job.workloads.values():
135+
if workload.backend == "elastic":
136+
self.assertEqual(workload.comm_pre_check, True)
137+
138+
self.assertEqual(dl_job.node_num, 2)
139+
self.assertEqual(dl_job.device_per_node, 2)
140+
workload = dl_job.workloads["ELASTIC"]
141+
self.assertEqual(
142+
workload.entry_point,
143+
f"{root_dir}/dlrover/python/unified/tests/integration_test/dummy_run.py --test 0",
144+
)
145+
self.assertEqual(workload.total, 4) # nnodes * nproc_per_node
146+
147+
# test unspported cases
148+
with self.assertRaises(ValueError):
149+
DLJobBuilder().by_dlrover_run_cmd(
150+
"unsupported-run --nnodes=1 train.py"
151+
)
152+
153+
def test_by_torchrun_cmd(self):
154+
root_dir = os.path.dirname(
155+
os.path.dirname(
156+
os.path.dirname(
157+
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
158+
)
159+
)
160+
)
161+
cmd = f"torchrun --nnodes=2 --nproc_per_node=2 {root_dir}/dlrover/python/unified/tests/integration_test/dummy_run.py --test 0"
162+
163+
dl_job = DLJobBuilder().by_dlrover_run_cmd(cmd).build()
164+
165+
self.assertEqual(dl_job.node_num, 2)
166+
self.assertEqual(dl_job.device_per_node, 2)
167+
workload = dl_job.workloads["ELASTIC"]
168+
self.assertEqual(
169+
workload.entry_point,
170+
f"{root_dir}/dlrover/python/unified/tests/integration_test/dummy_run.py --test 0",
171+
)
172+
self.assertEqual(workload.total, 4) # nnodes * nproc_per_node
173+
121174
def test_extra_flag(self):
122175
job = (
123176
DLJobBuilder()

dlrover/python/unified/tests/integration_test/elastic_training_test.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,40 @@ def test_api_full(tmp_ray):
6969
assert ret == 0, "Job should succeed"
7070

7171

72+
@pytest.mark.timeout(40, func_only=True)
73+
def test_api_full_by_dlrover_run_cmd(tmp_ray):
74+
root_dir = os.path.dirname(
75+
os.path.dirname(
76+
os.path.dirname(
77+
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
78+
)
79+
)
80+
)
81+
cmd = f"dlrover-run --nnodes=2 --nproc_per_node=2 {root_dir}/dlrover/python/unified/tests/integration_test/dummy_run.py --test 0"
82+
83+
dl_job = DLJobBuilder().by_dlrover_run_cmd(cmd).build()
84+
85+
ret = dl_job.submit("test_cmd_api", master_cpu=1, master_memory=128)
86+
assert ret == 0, "Job submitted via by_dlrover_run_cmd should succeed"
87+
88+
89+
@pytest.mark.timeout(40, func_only=True)
90+
def test_api_full_by_torchrun_cmd(tmp_ray):
91+
root_dir = os.path.dirname(
92+
os.path.dirname(
93+
os.path.dirname(
94+
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
95+
)
96+
)
97+
)
98+
cmd = f"torchrun --nnodes=2 --nproc_per_node=2 {root_dir}/dlrover/python/unified/tests/integration_test/dummy_run.py --test 0"
99+
100+
dl_job = DLJobBuilder().by_dlrover_run_cmd(cmd).build()
101+
102+
ret = dl_job.submit("test_cmd_api", master_cpu=1, master_memory=128)
103+
assert ret == 0, "Job submitted via by_dlrover_run_cmd should succeed"
104+
105+
72106
@pytest.mark.timeout(40, func_only=True) # 25s in ci
73107
def test_api_full_with_cmd(tmp_ray):
74108
root_dir = os.path.dirname(

docs/tutorial/unified/02-unified-api-guide.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# 02. Unified API Guide [Experimental]
1+
# 02. Unified API Guide [Experimental]
22

33
This section focuses on the DLJobBuilder and submission patterns: how to
44
construct job configurations programmatically and submit them to the
@@ -36,7 +36,19 @@ job = (
3636

3737
job.submit(job_name="nanogpt")
3838
```
39+
- Single role(via CLI command): You can also initialize a single-role job by directly parsing a dlrover-run or
40+
torchrun command string. This automatically configures nnodes, nproc_per_node,
41+
and the training entrypoint.
42+
```python
43+
from dlrover.python.unified.api.builder import DLJobBuilder
44+
45+
# Conveniently convert a CLI command into a Ray job
46+
cmd = f"dlrover-run --nnodes=1 --nproc_per_node=1 {Your_dlrover_root_dir}/dlrover/python/unified/tests/integration_test/dummy_run.py --test 0"
3947

48+
job = DLJobBuilder().by_dlrover_run_cmd(cmd).build()
49+
50+
job.submit("test_cmd_api", master_cpu=1, master_memory=128)
51+
```
4052
### Advanced examples
4153

4254
- Multiple roles(outline):
@@ -87,7 +99,8 @@ version.)
8799
- role(str): defines the role name for multi-role jobs.
88100
- run(entrypoint): define a non-training workload with entrypoint, and return a sub builder.
89101
- workload(role, entrypoint): single method combine role + run
90-
- train(entrypoint): define a training workload with entrypoint (module path + function or command with python file), and return a sub builder.
102+
- train(entrypoint): define a training workload with entrypoint (module path + function or command with python file), and return a sub builder.
103+
- by_dlrover_run_cmd(command_str): Parses a dlrover-run or torchrun command to set up a single-role training job.
91104

92105
### Workload / Role patterns
93106

0 commit comments

Comments
 (0)