Skip to content

Commit 595c436

Browse files
authored
Add tests for schedulers (#188)
* Add tests for schedulers Signed-off-by: Hemil Desai <[email protected]> * fix Signed-off-by: Hemil Desai <[email protected]> --------- Signed-off-by: Hemil Desai <[email protected]>
1 parent a3c07ce commit 595c436

File tree

4 files changed

+260
-1
lines changed

4 files changed

+260
-1
lines changed

nemo_run/run/torchx_backend/schedulers/dgxcloud.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ def _cancel_existing(self, app_id: str) -> None:
184184
executor: DGXCloudExecutor = job_info.get("executor", None) # type: ignore
185185
if not executor:
186186
return None
187-
executor.delete(job_id)
187+
executor.cancel(job_id)
188188

189189
def list(self) -> list[ListAppResponse]: ...
190190

test/run/torchx_backend/schedulers/test_dgxcloud.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,87 @@ def test_dgx_cloud_scheduler_methods(dgx_cloud_scheduler):
7474
assert hasattr(dgx_cloud_scheduler, "describe")
7575
assert hasattr(dgx_cloud_scheduler, "_cancel_existing")
7676
assert hasattr(dgx_cloud_scheduler, "_validate")
77+
78+
79+
def test_schedule(dgx_cloud_scheduler, mock_app_def, dgx_cloud_executor):
80+
with (
81+
mock.patch.object(DGXCloudExecutor, "package") as mock_package,
82+
mock.patch.object(DGXCloudExecutor, "launch") as mock_launch,
83+
):
84+
mock_package.return_value = None
85+
mock_launch.return_value = ("test_job_id", "RUNNING")
86+
87+
# Set job_name and experiment_id on executor
88+
dgx_cloud_executor.job_name = "test_job"
89+
dgx_cloud_executor.experiment_id = "test_experiment"
90+
91+
dryrun_info = dgx_cloud_scheduler._submit_dryrun(mock_app_def, dgx_cloud_executor)
92+
app_id = dgx_cloud_scheduler.schedule(dryrun_info)
93+
94+
assert app_id == "test_experiment___test_role___test_job_id"
95+
mock_package.assert_called_once()
96+
mock_launch.assert_called_once()
97+
98+
99+
def test_describe(dgx_cloud_scheduler, dgx_cloud_executor):
100+
with (
101+
mock.patch(
102+
"nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs"
103+
) as mock_get_job_dirs,
104+
mock.patch.object(DGXCloudExecutor, "status") as mock_status,
105+
):
106+
mock_get_job_dirs.return_value = {
107+
"test_experiment___test_role___test_job_id": {
108+
"job_status": "RUNNING",
109+
"executor": dgx_cloud_executor,
110+
}
111+
}
112+
mock_status.return_value = "RUNNING"
113+
114+
response = dgx_cloud_scheduler.describe("test_experiment___test_role___test_job_id")
115+
assert response is not None
116+
assert response.app_id == "test_experiment___test_role___test_job_id"
117+
assert len(response.roles) == 1
118+
assert response.roles[0].name == "test_role"
119+
120+
121+
def test_cancel_existing(dgx_cloud_scheduler, dgx_cloud_executor):
122+
with (
123+
mock.patch(
124+
"nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs"
125+
) as mock_get_job_dirs,
126+
mock.patch.object(DGXCloudExecutor, "cancel") as mock_cancel,
127+
):
128+
mock_get_job_dirs.return_value = {
129+
"test_experiment___test_role___test_job_id": {
130+
"job_status": "RUNNING",
131+
"executor": dgx_cloud_executor,
132+
}
133+
}
134+
135+
dgx_cloud_scheduler._cancel_existing("test_experiment___test_role___test_job_id")
136+
mock_cancel.assert_called_once_with("test_job_id")
137+
138+
139+
def test_save_and_get_job_dirs():
140+
with tempfile.TemporaryDirectory() as temp_dir:
141+
from nemo_run.config import set_nemorun_home
142+
143+
set_nemorun_home(temp_dir)
144+
145+
from nemo_run.run.torchx_backend.schedulers.dgxcloud import _get_job_dirs, _save_job_dir
146+
147+
executor = DGXCloudExecutor(
148+
base_url="https://test.com",
149+
app_id="test_id",
150+
app_secret="test_secret",
151+
project_name="test_project",
152+
container_image="test:latest",
153+
job_dir=temp_dir,
154+
)
155+
156+
_save_job_dir("test_app_id", "RUNNING", executor)
157+
job_dirs = _get_job_dirs()
158+
159+
assert "test_app_id" in job_dirs
160+
assert isinstance(job_dirs["test_app_id"]["executor"], DGXCloudExecutor)

test/run/torchx_backend/schedulers/test_docker.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
from nemo_run.core.execution.docker import DockerExecutor
2424
from nemo_run.run.torchx_backend.schedulers.docker import (
25+
DockerContainer,
26+
DockerJobRequest,
2527
PersistentDockerScheduler,
2628
create_scheduler,
2729
)
@@ -77,3 +79,123 @@ def test_docker_scheduler_methods(docker_scheduler):
7779
assert hasattr(docker_scheduler, "describe")
7880
assert hasattr(docker_scheduler, "log_iter")
7981
assert hasattr(docker_scheduler, "close")
82+
83+
84+
def test_schedule(docker_scheduler, mock_app_def, docker_executor):
85+
with (
86+
mock.patch.object(DockerExecutor, "package") as mock_package,
87+
mock.patch.object(DockerContainer, "run") as mock_run,
88+
):
89+
mock_package.return_value = None
90+
mock_run.return_value = ("test_container_id", "RUNNING")
91+
92+
# Set job_name on executor
93+
docker_executor.job_name = "test_job"
94+
95+
dryrun_info = docker_scheduler._submit_dryrun(mock_app_def, docker_executor)
96+
docker_scheduler.schedule(dryrun_info)
97+
98+
mock_package.assert_called_once()
99+
mock_run.assert_called_once()
100+
101+
102+
def test_describe(docker_scheduler, docker_executor):
103+
with (
104+
mock.patch.object(DockerJobRequest, "load") as mock_load,
105+
mock.patch.object(DockerContainer, "get_container") as mock_get_container,
106+
):
107+
mock_load.return_value = DockerJobRequest(
108+
id="test_session___test_role___test_container_id",
109+
executor=docker_executor,
110+
containers=[
111+
DockerContainer(
112+
name="test_role",
113+
command=["test"],
114+
executor=docker_executor,
115+
extra_env={},
116+
)
117+
],
118+
)
119+
mock_get_container.return_value = None
120+
121+
response = docker_scheduler.describe("test_session___test_role___test_container_id")
122+
assert response is not None
123+
assert response.app_id == "test_session___test_role___test_container_id"
124+
assert "SUCCEEDED" in str(response.state)
125+
assert len(response.roles) == 1
126+
127+
128+
def test_save_and_get_job_dirs():
129+
with tempfile.TemporaryDirectory() as temp_dir:
130+
from nemo_run.config import set_nemorun_home
131+
132+
set_nemorun_home(temp_dir)
133+
134+
from nemo_run.run.torchx_backend.schedulers.docker import DockerJobRequest
135+
136+
executor = DockerExecutor(
137+
container_image="test:latest",
138+
job_dir=temp_dir,
139+
)
140+
141+
req = DockerJobRequest(
142+
id="test_app_id",
143+
executor=executor,
144+
containers=[
145+
DockerContainer(
146+
name="test_role",
147+
command=["test"],
148+
executor=executor,
149+
extra_env={},
150+
)
151+
],
152+
)
153+
req.save()
154+
155+
loaded_req = DockerJobRequest.load("test_app_id")
156+
assert loaded_req is not None
157+
assert loaded_req.id == "test_app_id"
158+
assert isinstance(loaded_req.executor, DockerExecutor)
159+
160+
161+
def test_run_opts(docker_scheduler):
162+
opts = docker_scheduler._run_opts()
163+
assert "copy_env" in str(opts)
164+
assert "env" in str(opts)
165+
assert "privileged" in str(opts)
166+
167+
168+
def test_log_iter(docker_scheduler, docker_executor):
169+
with (
170+
mock.patch.object(DockerJobRequest, "load") as mock_load,
171+
mock.patch.object(DockerContainer, "get_container") as mock_get_container,
172+
):
173+
mock_load.return_value = DockerJobRequest(
174+
id="test_session___test_role___test_container_id",
175+
executor=docker_executor,
176+
containers=[
177+
DockerContainer(
178+
name="test_role",
179+
command=["test"],
180+
executor=docker_executor,
181+
extra_env={},
182+
)
183+
],
184+
)
185+
container_mock = mock.Mock()
186+
container_mock.logs = mock.Mock(return_value=["log1", "log2"])
187+
mock_get_container.return_value = container_mock
188+
189+
logs = list(
190+
docker_scheduler.log_iter("test_session___test_role___test_container_id", "test_role")
191+
)
192+
assert logs == ["log1", "log2"]
193+
assert mock_get_container.call_count == 1
194+
assert container_mock.logs.call_count == 1
195+
196+
197+
def test_close(docker_scheduler):
198+
with mock.patch.object(DockerContainer, "delete") as mock_delete:
199+
docker_scheduler._scheduled_reqs = [] # No requests to clean up
200+
docker_scheduler.close()
201+
mock_delete.assert_not_called() # No cleanup needed since no requests

test/run/torchx_backend/schedulers/test_skypilot.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
# limitations under the License.
1515

1616
import tempfile
17+
from unittest import mock
1718

1819
import pytest
20+
from torchx.schedulers.api import AppDryRunInfo
1921
from torchx.specs import AppDef, Role
2022

2123
from nemo_run.core.execution.skypilot import SkypilotExecutor
@@ -57,3 +59,54 @@ def test_skypilot_scheduler_methods(skypilot_scheduler):
5759
assert hasattr(skypilot_scheduler, "schedule")
5860
assert hasattr(skypilot_scheduler, "describe")
5961
assert hasattr(skypilot_scheduler, "_validate")
62+
63+
64+
def test_submit_dryrun(skypilot_scheduler, mock_app_def, skypilot_executor):
65+
with mock.patch.object(SkypilotExecutor, "package") as mock_package:
66+
mock_package.return_value = None
67+
68+
dryrun_info = skypilot_scheduler._submit_dryrun(mock_app_def, skypilot_executor)
69+
assert isinstance(dryrun_info, AppDryRunInfo)
70+
assert dryrun_info.request is not None
71+
72+
73+
def test_schedule(skypilot_scheduler, mock_app_def, skypilot_executor):
74+
class MockHandle:
75+
def get_cluster_name(self):
76+
return "test_cluster_name"
77+
78+
with (
79+
mock.patch.object(SkypilotExecutor, "package") as mock_package,
80+
mock.patch.object(SkypilotExecutor, "launch") as mock_launch,
81+
):
82+
mock_package.return_value = None
83+
mock_launch.return_value = (123, MockHandle())
84+
85+
# Set job_name and experiment_id on executor
86+
skypilot_executor.job_name = "test_job"
87+
skypilot_executor.experiment_id = "test_session"
88+
89+
dryrun_info = skypilot_scheduler._submit_dryrun(mock_app_def, skypilot_executor)
90+
app_id = skypilot_scheduler.schedule(dryrun_info)
91+
92+
assert app_id == "test_session___test_cluster_name___test_role___123"
93+
mock_package.assert_called_once()
94+
mock_launch.assert_called_once()
95+
96+
97+
def test_cancel_existing(skypilot_scheduler, skypilot_executor):
98+
with (
99+
mock.patch.object(SkypilotExecutor, "parse_app") as mock_parse_app,
100+
mock.patch.object(SkypilotExecutor, "cancel") as mock_cancel,
101+
):
102+
mock_parse_app.return_value = ("test_cluster_name", "test_role", 123)
103+
104+
skypilot_scheduler._cancel_existing("test_session___test_cluster_name___test_role___123")
105+
mock_cancel.assert_called_once_with(
106+
app_id="test_session___test_cluster_name___test_role___123"
107+
)
108+
109+
110+
def test_validate(skypilot_scheduler, mock_app_def):
111+
# Test that validation doesn't raise any errors
112+
skypilot_scheduler._validate(mock_app_def, "skypilot")

0 commit comments

Comments
 (0)