Skip to content

Commit 9814e75

Browse files
authored
schedulers parameterized
Differential Revision: D71023681 Pull Request resolved: #1022
1 parent 24550f4 commit 9814e75

14 files changed

+59
-30
lines changed

torchx/schedulers/api.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,11 @@ def __hash__(self) -> int:
9494

9595

9696
T = TypeVar("T")
97+
A = TypeVar("A")
98+
D = TypeVar("D")
9799

98100

99-
class Scheduler(abc.ABC, Generic[T]):
101+
class Scheduler(abc.ABC, Generic[T, A, D]):
100102
"""
101103
An interface abstracting functionalities of a scheduler.
102104
Implementers need only implement those methods annotated with
@@ -126,7 +128,7 @@ def close(self) -> None:
126128

127129
def submit(
128130
self,
129-
app: AppDef,
131+
app: A,
130132
cfg: T,
131133
workspace: Optional[str] = None,
132134
) -> str:
@@ -150,7 +152,7 @@ def submit(
150152
return self.schedule(dryrun_info)
151153

152154
@abc.abstractmethod
153-
def schedule(self, dryrun_info: AppDryRunInfo) -> str:
155+
def schedule(self, dryrun_info: D) -> str:
154156
"""
155157
Same as ``submit`` except that it takes an ``AppDryRunInfo``.
156158
Implementers are encouraged to implement this method rather than
@@ -166,7 +168,7 @@ def schedule(self, dryrun_info: AppDryRunInfo) -> str:
166168

167169
raise NotImplementedError()
168170

169-
def submit_dryrun(self, app: AppDef, cfg: T) -> AppDryRunInfo:
171+
def submit_dryrun(self, app: A, cfg: T) -> D:
170172
"""
171173
Rather than submitting the request to run the app, returns the
172174
request object that would have been submitted to the underlying
@@ -179,14 +181,16 @@ def submit_dryrun(self, app: AppDef, cfg: T) -> AppDryRunInfo:
179181
resolved_cfg = self.run_opts().resolve(cfg)
180182
# pyre-fixme: _submit_dryrun takes Generic type for resolved_cfg
181183
dryrun_info = self._submit_dryrun(app, resolved_cfg)
182-
for role in app.roles:
183-
dryrun_info = role.pre_proc(self.backend, dryrun_info)
184-
dryrun_info._app = app
185-
dryrun_info._cfg = resolved_cfg
184+
185+
if isinstance(app, AppDef):
186+
for role in app.roles:
187+
dryrun_info = role.pre_proc(self.backend, dryrun_info)
188+
dryrun_info._app = app
189+
dryrun_info._cfg = resolved_cfg
186190
return dryrun_info
187191

188192
@abc.abstractmethod
189-
def _submit_dryrun(self, app: AppDef, cfg: T) -> AppDryRunInfo:
193+
def _submit_dryrun(self, app: A, cfg: T) -> D:
190194
raise NotImplementedError()
191195

192196
def run_opts(self) -> runopts:
@@ -345,18 +349,19 @@ def _pre_build_validate(self, app: AppDef, scheduler: str, cfg: T) -> None:
345349
"""
346350
pass
347351

348-
def _validate(self, app: AppDef, scheduler: str, cfg: T) -> None:
352+
def _validate(self, app: A, scheduler: str, cfg: T) -> None:
349353
"""
350354
Validates after workspace build whether application is consistent with the scheduler.
351355
352356
Raises error if application is not compatible with scheduler
353357
"""
354-
for role in app.roles:
355-
if role.resource == NULL_RESOURCE:
356-
raise ValueError(
357-
f"No resource for role: {role.image}."
358-
f" Did you forget to attach resource to the role"
359-
)
358+
if isinstance(app, AppDef):
359+
for role in app.roles:
360+
if role.resource == NULL_RESOURCE:
361+
raise ValueError(
362+
f"No resource for role: {role.image}."
363+
f" Did you forget to attach resource to the role"
364+
)
360365

361366

362367
def filter_regex(regex: str, data: Iterable[str]) -> Iterable[str]:

torchx/schedulers/aws_batch_scheduler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,9 @@ class AWSBatchOpts(TypedDict, total=False):
363363
execution_role_arn: Optional[str]
364364

365365

366-
class AWSBatchScheduler(DockerWorkspaceMixin, Scheduler[AWSBatchOpts]):
366+
class AWSBatchScheduler(
367+
DockerWorkspaceMixin, Scheduler[AWSBatchOpts, AppDef, AppDryRunInfo[BatchJob]]
368+
):
367369
"""
368370
AWSBatchScheduler is a TorchX scheduling interface to AWS Batch.
369371

torchx/schedulers/aws_sagemaker_scheduler.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ def _merge_ordered(
156156
return merged
157157

158158

159-
class AWSSageMakerScheduler(DockerWorkspaceMixin, Scheduler[AWSSageMakerOpts]): # type: ignore[misc]
159+
class AWSSageMakerScheduler(
160+
DockerWorkspaceMixin,
161+
Scheduler[AWSSageMakerOpts, AppDef, AppDryRunInfo[AWSSageMakerJob]],
162+
):
160163
"""
161164
AWSSageMakerScheduler is a TorchX scheduling interface to AWS SageMaker.
162165

torchx/schedulers/docker_scheduler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ class DockerOpts(TypedDict, total=False):
128128
privileged: bool
129129

130130

131-
class DockerScheduler(DockerWorkspaceMixin, Scheduler[DockerOpts]):
131+
class DockerScheduler(
132+
DockerWorkspaceMixin, Scheduler[DockerOpts, AppDef, AppDryRunInfo[DockerJob]]
133+
):
132134
"""
133135
DockerScheduler is a TorchX scheduling interface to Docker.
134136

torchx/schedulers/gcp_batch_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class GCPBatchOpts(TypedDict, total=False):
104104
location: Optional[str]
105105

106106

107-
class GCPBatchScheduler(Scheduler[GCPBatchOpts]):
107+
class GCPBatchScheduler(Scheduler[GCPBatchOpts, AppDef, AppDryRunInfo[GCPBatchJob]]):
108108
"""
109109
GCPBatchScheduler is a TorchX scheduling interface to GCP Batch.
110110

torchx/schedulers/kubernetes_mcad_scheduler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,9 @@ class KubernetesMCADOpts(TypedDict, total=False):
796796
network: Optional[str]
797797

798798

799-
class KubernetesMCADScheduler(DockerWorkspaceMixin, Scheduler[KubernetesMCADOpts]):
799+
class KubernetesMCADScheduler(
800+
DockerWorkspaceMixin, Scheduler[KubernetesMCADOpts, AppDef, AppDryRunInfo]
801+
):
800802
"""
801803
KubernetesMCADScheduler is a TorchX scheduling interface to Kubernetes.
802804

torchx/schedulers/kubernetes_scheduler.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,10 @@ class KubernetesOpts(TypedDict, total=False):
472472
priority_class: Optional[str]
473473

474474

475-
class KubernetesScheduler(DockerWorkspaceMixin, Scheduler[KubernetesOpts]):
475+
class KubernetesScheduler(
476+
DockerWorkspaceMixin,
477+
Scheduler[KubernetesOpts, AppDef, AppDryRunInfo[KubernetesJob]],
478+
):
476479
"""
477480
KubernetesScheduler is a TorchX scheduling interface to Kubernetes.
478481

torchx/schedulers/local_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ def _register_termination_signals() -> None:
529529
signal.signal(signal.SIGINT, _terminate_process_handler)
530530

531531

532-
class LocalScheduler(Scheduler[LocalOpts]):
532+
class LocalScheduler(Scheduler[LocalOpts, AppDef, AppDryRunInfo[PopenRequest]]):
533533
"""
534534
Schedules on localhost. Containers are modeled as processes and
535535
certain properties of the container that are either not relevant

torchx/schedulers/lsf_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ def __repr__(self) -> str:
395395
{self.materialize()}"""
396396

397397

398-
class LsfScheduler(Scheduler[LsfOpts]):
398+
class LsfScheduler(Scheduler[LsfOpts, AppDef, AppDryRunInfo]):
399399
"""
400400
**Example: hello_world**
401401

torchx/schedulers/ray_scheduler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,9 @@ class RayJob:
114114
requirements: Optional[str] = None
115115
actors: List[RayActor] = field(default_factory=list)
116116

117-
class RayScheduler(TmpDirWorkspaceMixin, Scheduler[RayOpts]):
117+
class RayScheduler(
118+
TmpDirWorkspaceMixin, Scheduler[RayOpts, AppDef, AppDryRunInfo[RayJob]]
119+
):
118120
"""
119121
RayScheduler is a TorchX scheduling interface to Ray. The job def
120122
workers will be launched as Ray actors

0 commit comments

Comments
 (0)