Skip to content

Commit 2683a61

Browse files
kiukchungfacebook-github-bot
authored andcommitted
(torchx/workspace) Support multi-project/directory workspace
Summary: Usage: In your `.torchxconfig` file specify multi-project workspace as follows: ``` [cli:run] workspace = /home/$USER/github/myproj: /home/$USER/github/verl: verl /home/$USER/.torchxconfig: verl/.torchxconfig ``` Then the workspace is built as: 1. `/home/$USER/github/myproj/**` -> `$REMOTE_ROOT/**` 1. `/home/$USER/github/verl/**` -> `$REMOTE_ROOT/verl/**` 1. `/home/$USER/.torchxconfig` -> `$REMOTE_ROOT/verl/.torchxconfig` Notes: 1. The mappings are basically like BUCK `python_library`'s `resoruce` attribute. Mapping to an empty string, or simply leaving the target blank (as we did for `myproj` in the example above) copies the contents of the src dir directly into the remote workspace root 1. Files can be specified as src. The target of a file can be a directory, and if so, the file will be copied into the directory. For example: ``` [cli:run] workspace = /home/$USER/github/verl: verl /home/$USER/.torchxconfig: verl ``` copies `.torchxconfig` into `$REMOTE_ROOT/verl/.torchxconfig`. However if we switching the order of the projects won't work: ``` [cli:run] workspace = /home/$USER/.torchxconfig: verl /home/$USER/github/verl: verl ``` This would copy `.torchxconfig` to the file called `verl` in `$REMOTE_ROOT/verl`, then `github/verl` would fail to copy. Differential Revision: D82169554
1 parent 9d9b5fd commit 2683a61

File tree

8 files changed

+299
-42
lines changed

8 files changed

+299
-42
lines changed

torchx/cli/cmd_run.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
)
3737
from torchx.util.log_tee_helpers import tee_logs
3838
from torchx.util.types import none_throws
39+
from torchx.workspace import Workspace
3940

4041

4142
MISSING_COMPONENT_ERROR_MSG = (
@@ -289,12 +290,13 @@ def _run_inner(self, runner: Runner, args: TorchXRunArgs) -> None:
289290
else args.component_args
290291
)
291292
try:
293+
workspace = Workspace.from_str(args.workspace) if args.workspace else None
292294
if args.dryrun:
293295
dryrun_info = runner.dryrun_component(
294296
args.component_name,
295297
component_args,
296298
args.scheduler,
297-
workspace=args.workspace,
299+
workspace=workspace,
298300
cfg=args.scheduler_cfg,
299301
parent_run_id=args.parent_run_id,
300302
)

torchx/runner/api.py

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
from torchx.util.session import get_session_id_or_create_new, TORCHX_INTERNAL_SESSION_ID
5555

5656
from torchx.util.types import none_throws
57-
from torchx.workspace.api import WorkspaceMixin
57+
from torchx.workspace.api import Workspace, WorkspaceMixin
5858

5959
if TYPE_CHECKING:
6060
from typing_extensions import Self
@@ -171,7 +171,7 @@ def run_component(
171171
component_args: Union[list[str], dict[str, Any]],
172172
scheduler: str,
173173
cfg: Optional[Mapping[str, CfgVal]] = None,
174-
workspace: Optional[str] = None,
174+
workspace: Optional[Union[Workspace, str]] = None,
175175
parent_run_id: Optional[str] = None,
176176
) -> AppHandle:
177177
"""
@@ -206,7 +206,7 @@ def run_component(
206206
ComponentNotFoundException: if the ``component_path`` is failed to resolve.
207207
"""
208208

209-
with log_event("run_component", workspace=workspace) as ctx:
209+
with log_event("run_component") as ctx:
210210
dryrun_info = self.dryrun_component(
211211
component,
212212
component_args,
@@ -217,7 +217,8 @@ def run_component(
217217
)
218218
handle = self.schedule(dryrun_info)
219219
app = none_throws(dryrun_info._app)
220-
ctx._torchx_event.workspace = workspace
220+
221+
ctx._torchx_event.workspace = str(workspace)
221222
ctx._torchx_event.scheduler = none_throws(dryrun_info._scheduler)
222223
ctx._torchx_event.app_image = app.roles[0].image
223224
ctx._torchx_event.app_id = parse_app_handle(handle)[2]
@@ -230,7 +231,7 @@ def dryrun_component(
230231
component_args: Union[list[str], dict[str, Any]],
231232
scheduler: str,
232233
cfg: Optional[Mapping[str, CfgVal]] = None,
233-
workspace: Optional[str] = None,
234+
workspace: Optional[Union[Workspace, str]] = None,
234235
parent_run_id: Optional[str] = None,
235236
) -> AppDryRunInfo:
236237
"""
@@ -259,7 +260,7 @@ def run(
259260
app: AppDef,
260261
scheduler: str,
261262
cfg: Optional[Mapping[str, CfgVal]] = None,
262-
workspace: Optional[str] = None,
263+
workspace: Optional[Union[Workspace, str]] = None,
263264
parent_run_id: Optional[str] = None,
264265
) -> AppHandle:
265266
"""
@@ -272,9 +273,7 @@ def run(
272273
An application handle that is used to call other action APIs on the app.
273274
"""
274275

275-
with log_event(
276-
api="run", runcfg=json.dumps(cfg) if cfg else None, workspace=workspace
277-
) as ctx:
276+
with log_event(api="run") as ctx:
278277
dryrun_info = self.dryrun(
279278
app,
280279
scheduler,
@@ -283,10 +282,15 @@ def run(
283282
parent_run_id=parent_run_id,
284283
)
285284
handle = self.schedule(dryrun_info)
286-
ctx._torchx_event.scheduler = none_throws(dryrun_info._scheduler)
287-
ctx._torchx_event.app_image = none_throws(dryrun_info._app).roles[0].image
288-
ctx._torchx_event.app_id = parse_app_handle(handle)[2]
289-
ctx._torchx_event.app_metadata = app.metadata
285+
286+
event = ctx._torchx_event
287+
event.scheduler = scheduler
288+
event.runcfg = json.dumps(cfg) if cfg else None
289+
event.workspace = str(workspace)
290+
event.app_id = parse_app_handle(handle)[2]
291+
event.app_image = none_throws(dryrun_info._app).roles[0].image
292+
event.app_metadata = app.metadata
293+
290294
return handle
291295

292296
def schedule(self, dryrun_info: AppDryRunInfo) -> AppHandle:
@@ -320,21 +324,22 @@ def schedule(self, dryrun_info: AppDryRunInfo) -> AppHandle:
320324
321325
"""
322326
scheduler = none_throws(dryrun_info._scheduler)
323-
app_image = none_throws(dryrun_info._app).roles[0].image
324327
cfg = dryrun_info._cfg
325-
with log_event(
326-
"schedule",
327-
scheduler,
328-
app_image=app_image,
329-
runcfg=json.dumps(cfg) if cfg else None,
330-
) as ctx:
328+
with log_event("schedule") as ctx:
331329
sched = self._scheduler(scheduler)
332330
app_id = sched.schedule(dryrun_info)
333331
app_handle = make_app_handle(scheduler, self._name, app_id)
332+
334333
app = none_throws(dryrun_info._app)
335334
self._apps[app_handle] = app
336-
_, _, app_id = parse_app_handle(app_handle)
337-
ctx._torchx_event.app_id = app_id
335+
336+
event = ctx._torchx_event
337+
event.scheduler = scheduler
338+
event.runcfg = json.dumps(cfg) if cfg else None
339+
event.app_id = app_id
340+
event.app_image = none_throws(dryrun_info._app).roles[0].image
341+
event.app_metadata = app.metadata
342+
338343
return app_handle
339344

340345
def name(self) -> str:
@@ -345,7 +350,7 @@ def dryrun(
345350
app: AppDef,
346351
scheduler: str,
347352
cfg: Optional[Mapping[str, CfgVal]] = None,
348-
workspace: Optional[str] = None,
353+
workspace: Optional[Union[Workspace, str]] = None,
349354
parent_run_id: Optional[str] = None,
350355
) -> AppDryRunInfo:
351356
"""
@@ -414,7 +419,7 @@ def dryrun(
414419
"dryrun",
415420
scheduler,
416421
runcfg=json.dumps(cfg) if cfg else None,
417-
workspace=workspace,
422+
workspace=str(workspace),
418423
):
419424
sched = self._scheduler(scheduler)
420425
resolved_cfg = sched.run_opts().resolve(cfg)
@@ -429,7 +434,7 @@ def dryrun(
429434
logger.info(
430435
'To disable workspaces pass: --workspace="" from CLI or workspace=None programmatically.'
431436
)
432-
sched.build_workspace_and_update_role(role, workspace, resolved_cfg)
437+
sched.build_workspace_and_update_role2(role, workspace, resolved_cfg)
433438

434439
if old_img != role.image:
435440
logger.info(

torchx/runner/events/api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class TorchxEvent:
2929
scheduler: Scheduler that is used to execute request
3030
api: Api name
3131
app_id: Unique id that is set by the underlying scheduler
32-
image: Image/container bundle that is used to execute request.
32+
app_image: Image/container bundle that is used to execute request.
3333
app_metadata: metadata to the app (treatment of metadata is scheduler dependent)
3434
runcfg: Run config that was used to schedule app.
3535
source: Type of source the event is generated.

torchx/runner/test/config_test.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from torchx.schedulers.api import DescribeAppResponse, ListAppResponse, Stream
2828
from torchx.specs import AppDef, AppDryRunInfo, CfgVal, runopts
2929
from torchx.test.fixtures import TestWithTmpDir
30+
from torchx.workspace import Workspace
3031

3132

3233
class TestScheduler(Scheduler):
@@ -506,3 +507,29 @@ def test_dump_and_load_all_registered_schedulers(self) -> None:
506507
opt_name in cfg,
507508
f"missing {opt_name} in {sched} run opts with cfg {cfg}",
508509
)
510+
511+
def test_get_workspace_config(self) -> None:
512+
configdir = self.tmpdir
513+
self.write(
514+
str(configdir / ".torchxconfig"),
515+
"""#
516+
[cli:run]
517+
workspace =
518+
/home/foo/third-party/verl: verl
519+
/home/foo/bar/scripts/.torchxconfig: verl/.torchxconfig
520+
/home/foo/baz:
521+
""",
522+
)
523+
524+
workspace_config = get_config(
525+
prefix="cli", name="run", key="workspace", dirs=[str(configdir)]
526+
)
527+
workspace = Workspace.from_str(workspace_config)
528+
self.assertDictEqual(
529+
{
530+
"/home/foo/third-party/verl": "verl",
531+
"/home/foo/bar/scripts/.torchxconfig": "verl/.torchxconfig",
532+
"/home/foo/baz": None,
533+
},
534+
workspace.projects,
535+
)

torchx/schedulers/api.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from dataclasses import dataclass, field
1313
from datetime import datetime
1414
from enum import Enum
15-
from typing import Generic, Iterable, List, Optional, TypeVar
15+
from typing import Generic, Iterable, List, Optional, TypeVar, Union
1616

1717
from torchx.specs import (
1818
AppDef,
@@ -23,7 +23,7 @@
2323
RoleStatus,
2424
runopts,
2525
)
26-
from torchx.workspace.api import WorkspaceMixin
26+
from torchx.workspace.api import Workspace, WorkspaceMixin
2727

2828

2929
DAYS_IN_2_WEEKS = 14
@@ -131,7 +131,7 @@ def submit(
131131
self,
132132
app: A,
133133
cfg: T,
134-
workspace: Optional[str] = None,
134+
workspace: Optional[Union[Workspace, str]] = None,
135135
) -> str:
136136
"""
137137
Submits the application to be run by the scheduler.
@@ -144,10 +144,9 @@ def submit(
144144
# pyre-fixme: Generic cfg type passed to resolve
145145
resolved_cfg = self.run_opts().resolve(cfg)
146146
if workspace:
147-
sched = self
148-
assert isinstance(sched, WorkspaceMixin)
149-
role = app.roles[0]
150-
sched.build_workspace_and_update_role(role, workspace, resolved_cfg)
147+
assert isinstance(self, WorkspaceMixin)
148+
self.build_workspace_and_update_role2(app.roles[0], workspace, resolved_cfg)
149+
151150
# pyre-fixme: submit_dryrun takes Generic type for resolved_cfg
152151
dryrun_info = self.submit_dryrun(app, resolved_cfg)
153152
return self.schedule(dryrun_info)
@@ -356,13 +355,14 @@ def _validate(self, app: A, scheduler: str, cfg: T) -> None:
356355
357356
Raises error if application is not compatible with scheduler
358357
"""
359-
if isinstance(app, AppDef):
360-
for role in app.roles:
361-
if role.resource == NULL_RESOURCE:
362-
raise ValueError(
363-
f"No resource for role: {role.image}."
364-
f" Did you forget to attach resource to the role"
365-
)
358+
if not isinstance(app, AppDef):
359+
return
360+
361+
for role in app.roles:
362+
if role.resource == NULL_RESOURCE:
363+
raise ValueError(
364+
f"No resource for role: {role.image}. Did you forget to attach resource to the role"
365+
)
366366

367367

368368
def filter_regex(regex: str, data: Iterable[str]) -> Iterable[str]:

torchx/workspace/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@
2222
* ``memory://foo-bar/`` an in-memory workspace for notebook/programmatic usage
2323
"""
2424

25-
from torchx.workspace.api import walk_workspace, WorkspaceMixin # noqa: F401
25+
from torchx.workspace.api import walk_workspace, Workspace, WorkspaceMixin # noqa: F401

torchx/workspace/api.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99
import abc
1010
import fnmatch
1111
import posixpath
12+
import shutil
13+
import tempfile
1214
import warnings
1315
from dataclasses import dataclass
16+
from pathlib import Path
1417
from typing import Any, Dict, Generic, Iterable, Mapping, Tuple, TYPE_CHECKING, TypeVar
1518

1619
from torchx.specs import AppDef, CfgVal, Role, runopts
@@ -75,6 +78,64 @@ def build_workspace(self, sync: bool = True) -> PkgInfo[PackageType]:
7578
pass
7679

7780

81+
@dataclass
82+
class Workspace:
83+
"""
84+
Specifies a local "workspace" (a set of directories). Workspaces are ad-hoc built
85+
into an (usually ephemeral) image. This effectively mirrors the local code changes
86+
at job submission time.
87+
88+
For example:
89+
90+
1. ``projects={"~/github/torch": "torch"}`` copies ``~/github/torch/**`` into ``$REMOTE_WORKSPACE_ROOT/torch/**``
91+
2. ``projects={"~/github/torch": ""}`` copies ``~/github/torch/**`` into ``$REMOTE_WORKSPACE_ROOT/**``
92+
93+
The exact location of ``$REMOTE_WORKSPACE_ROOT`` is implementation dependent and varies between
94+
different implementations of :py:class:`~torchx.workspace.api.WorkspaceMixin`.
95+
Check the scheduler documentation for details on which workspace it supports.
96+
97+
Note: ``projects`` maps the location of the local project to a sub-directory in the remote workspace root directory.
98+
Typically the local project location is a directory path (e.g. ``/home/foo/github/torch``).
99+
100+
101+
Attributes:
102+
projects: mapping of local project to the sub-dir in the remote workspace dir.
103+
"""
104+
105+
projects: dict[str, str]
106+
107+
@staticmethod
108+
def from_str(workspace: str) -> "Workspace":
109+
import yaml
110+
111+
projects = yaml.safe_load(workspace)
112+
if isinstance(projects, str): # single project workspace
113+
projects = {projects: ""}
114+
else: # multi-project workspace
115+
# Replace None mappings with "" (empty string)
116+
projects = {k: ("" if v is None else v) for k, v in projects.items()}
117+
118+
return Workspace(projects)
119+
120+
def __str__(self) -> str:
121+
"""
122+
Returns a string representation of the Workspace by concatenating
123+
the project mappings using ';' as a delimiter and ':' between key and value.
124+
If the single-project workspace with no target mapping, then simply
125+
returns the src (local project dir)
126+
127+
NOTE: meant to be used for logging purposes not serde.
128+
Therefore not symmetric with :py:func:`Workspace.from_str`.
129+
130+
"""
131+
if len(self.projects) == 1 and not next(iter(self.projects.values())):
132+
return next(iter(self.projects))
133+
else:
134+
return ";".join(
135+
k if not v else f"{k}:{v}" for k, v in self.projects.items()
136+
)
137+
138+
78139
class WorkspaceMixin(abc.ABC, Generic[T]):
79140
"""
80141
Note: (Prototype) this interface may change without notice!
@@ -100,6 +161,37 @@ def workspace_opts(self) -> runopts:
100161
"""
101162
return runopts()
102163

164+
def build_workspace_and_update_role2(
165+
self, role: Role, workspace: Workspace | str, cfg: Mapping[str, CfgVal]
166+
) -> None:
167+
"""
168+
Same as :py:meth:`build_workspace_and_update_role` but operates
169+
on :py:class:`Workspace` (supports multi-project workspaces)
170+
as well as ``str`` (for backwards compatibility).
171+
172+
If ``workspace`` is a ``str`` this method simply calls
173+
:py:meth:`build_workspace_and_update_role`.
174+
175+
If ``workspace`` is :py:class:`Workspace` then the default
176+
impl copies all the projects into a tmp directory and passes the tmp dir to
177+
:py:meth:`build_workspace_and_update_role`
178+
179+
Subclasses can override this method to customize multi-project
180+
workspace building logic.
181+
"""
182+
if isinstance(workspace, Workspace):
183+
with tempfile.TemporaryDirectory(suffix="torchx_workspace_") as outdir:
184+
for src, dst in workspace.projects.items():
185+
dst_path = Path(outdir) / dst
186+
if Path(src).is_file():
187+
shutil.copy2(src, dst_path)
188+
else: # src is dir
189+
shutil.copytree(src, dst_path, dirs_exist_ok=True)
190+
191+
self.build_workspace_and_update_role(role, outdir, cfg)
192+
else: # workspace is str
193+
self.build_workspace_and_update_role(role, workspace, cfg)
194+
103195
@abc.abstractmethod
104196
def build_workspace_and_update_role(
105197
self, role: Role, workspace: str, cfg: Mapping[str, CfgVal]

0 commit comments

Comments
 (0)