Skip to content

Commit caf3f12

Browse files
authored
refactor DGXC Lepton data mover: switch to BatchJob with auto cleanup and sleep after every run (#265)
* refactor DGXC Lepton data mover: switch to BatchJob with auto cleanup and sleep after every run Signed-off-by: prekshivyas <[email protected]>
1 parent dbddad3 commit caf3f12

File tree

2 files changed

+65
-45
lines changed

2 files changed

+65
-45
lines changed

nemo_run/core/execution/lepton.py

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,12 @@
1313
from invoke.context import Context
1414
from leptonai.api.v2.client import APIClient
1515
from leptonai.api.v1.types.affinity import LeptonResourceAffinity
16-
from leptonai.api.v1.types.common import LeptonVisibility, Metadata
16+
from leptonai.api.v1.types.common import Metadata, LeptonVisibility
1717
from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup
1818
from leptonai.api.v1.types.deployment import (
1919
EnvVar,
2020
LeptonContainer,
21-
LeptonDeployment,
22-
LeptonDeploymentUserSpec,
2321
Mount,
24-
ResourceRequirement,
2522
)
2623
from leptonai.api.v1.types.job import LeptonJob, LeptonJobState, LeptonJobUserSpec
2724
from leptonai.api.v1.types.replica import Replica
@@ -85,7 +82,7 @@ def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> Li
8582
full_command = ["sh", "-c", cmd]
8683
return full_command
8784

88-
def move_data(self, sleep: float = 10) -> None:
85+
def move_data(self, sleep: float = 10, timeout: int = 600, poll_interval: int = 5) -> None:
8986
"""
9087
Moves job directory into remote storage and deletes the workload after completion.
9188
"""
@@ -94,34 +91,60 @@ def move_data(self, sleep: float = 10) -> None:
9491
node_group_id = self._node_group_id(client)
9592
valid_node_ids = self._valid_node_ids(node_group_id, client)
9693

97-
spec = LeptonDeploymentUserSpec(
98-
container=LeptonContainer(
99-
image="busybox:1.37.0", # Use a very low resource container
100-
command=cmd,
101-
),
102-
mounts=[Mount(**mount) for mount in self.mounts],
103-
)
104-
spec.resource_requirement = ResourceRequirement(
94+
job_spec = LeptonJobUserSpec(
10595
resource_shape="cpu.small",
10696
affinity=LeptonResourceAffinity(
10797
allowed_dedicated_node_groups=[node_group_id.metadata.id_],
10898
allowed_nodes_in_node_group=valid_node_ids,
10999
),
110-
min_replicas=1,
111-
max_replicas=1,
100+
container=LeptonContainer(
101+
image="busybox:1.37.0",
102+
command=cmd,
103+
),
104+
completions=1,
105+
parallelism=1,
106+
mounts=[Mount(**mount) for mount in self.mounts],
112107
)
108+
113109
custom_name = f"data-mover-{int(datetime.now().timestamp())}"
114110

115-
deployment = LeptonDeployment(
111+
job = LeptonJob(
116112
metadata=Metadata(
117113
id=custom_name,
118114
name=custom_name,
119115
visibility=LeptonVisibility("private"),
120116
),
121-
spec=spec,
117+
spec=job_spec,
122118
)
123119

124-
client.deployment.create(deployment)
120+
response = client.job.create(job)
121+
job_id = response.metadata.id_
122+
123+
start_time = time.time()
124+
count = 0
125+
126+
while True:
127+
if time.time() - start_time > timeout:
128+
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds.")
129+
current_job = client.job.get(job_id)
130+
current_job_status = current_job.status.state
131+
if count > 0 and current_job_status in [
132+
LeptonJobState.Completed,
133+
LeptonJobState.Failed,
134+
LeptonJobState.Unknown,
135+
]:
136+
break
137+
count += 1
138+
time.sleep(poll_interval)
139+
140+
if current_job_status != LeptonJobState.Completed:
141+
raise RuntimeError(f"Job {job_id} failed with status: {current_job_status}")
142+
143+
delete_success = client.job.delete(job_id)
144+
if delete_success:
145+
logging.info(f"Successfully deleted job {job_id}")
146+
else:
147+
logging.error(f"Failed to delete job {job_id}")
125148

126149
def _node_group_id(self, client: APIClient) -> DedicatedNodeGroup:
127150
"""

test/core/execution/test_lepton.py

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,10 @@
2222
from leptonai.api.v1.types.common import LeptonVisibility, Metadata
2323
from leptonai.api.v1.types.deployment import (
2424
LeptonContainer,
25-
LeptonDeployment,
26-
LeptonDeploymentUserSpec,
2725
LeptonResourceAffinity,
2826
Mount,
29-
ResourceRequirement,
3027
)
31-
from leptonai.api.v1.types.job import LeptonJob
28+
from leptonai.api.v1.types.job import LeptonJob, LeptonJobUserSpec
3229

3330
from nemo_run.core.execution.lepton import LeptonExecutor, LeptonJobState
3431
from nemo_run.core.packaging.git import GitArchivePackager
@@ -173,26 +170,23 @@ def test_copy_directory_data_command_fails(self, mock_tempdir):
173170
@patch("nemo_run.core.execution.lepton.APIClient")
174171
def test_move_data_success(self, mock_APIClient, mock_datetime, mock_copy):
175172
mock_instance = MagicMock()
176-
mock_deployment_api = MagicMock()
177-
mock_instance.deployment = mock_deployment_api
178-
mock_deployment_api.create = MagicMock()
173+
mock_job_api = MagicMock()
174+
mock_instance.job = mock_job_api
179175
mock_copy.return_value = ["sh", "-c", "echo 'hello world'"]
180176
mock_APIClient.return_value = mock_instance
181177
mock_client = mock_APIClient.return_value
182178
mock_nodegroup = MagicMock()
183-
mock_datetime = MagicMock()
184179
mock_datetime_now = MagicMock()
185-
mock_datetime.now = mock_datetime_now
186-
mock_timestamp = MagicMock()
187-
mock_datetime_now.timestamp = MagicMock()
188-
mock_timestamp.return_value = "1"
180+
mock_datetime.now.return_value = mock_datetime_now
181+
mock_datetime_now.timestamp.return_value = 1
189182
mock_client.nodegroup = mock_nodegroup
190183
mock_nodegroup.list_all.return_value = [
191184
SimpleNamespace(metadata=SimpleNamespace(name="123456", id_="my-node-id"))
192185
]
193186
mock_nodegroup.list_nodes.return_value = [
194187
SimpleNamespace(metadata=SimpleNamespace(id_="10-10-10-10"))
195188
]
189+
mock_job_api.get.return_value = SimpleNamespace(status=SimpleNamespace(state="Completed"))
196190

197191
executor = LeptonExecutor(
198192
container_image="nvcr.io/nvidia/test:latest",
@@ -203,32 +197,35 @@ def test_move_data_success(self, mock_APIClient, mock_datetime, mock_copy):
203197

204198
executor.move_data()
205199

206-
spec = LeptonDeploymentUserSpec(
207-
container=LeptonContainer(
208-
image="busybox:1.37.0",
209-
command=["sh", "-c", "echo 'hello world'"],
210-
),
211-
mounts=[Mount(path="/workspace", mount_path="/workspace")],
212-
)
213-
spec.resource_requirement = ResourceRequirement(
200+
expected_cmd = ["sh", "-c", "echo 'hello world'"]
201+
expected_spec = LeptonJobUserSpec(
214202
resource_shape="cpu.small",
215203
affinity=LeptonResourceAffinity(
216204
allowed_dedicated_node_groups=["my-node-id"],
217205
allowed_nodes_in_node_group=["10-10-10-10"],
218206
),
219-
min_replicas=1,
220-
max_replicas=1,
207+
container=LeptonContainer(
208+
image="busybox:1.37.0",
209+
command=expected_cmd,
210+
),
211+
completions=1,
212+
parallelism=1,
213+
mounts=[Mount(path="/workspace", mount_path="/workspace")],
221214
)
215+
222216
custom_name = "data-mover-1"
223-
deployment = LeptonDeployment(
217+
expected_job = LeptonJob(
224218
metadata=Metadata(
225-
id=custom_name, name=custom_name, visibility=LeptonVisibility("private")
219+
id=custom_name,
220+
name=custom_name,
221+
visibility=LeptonVisibility("private"),
226222
),
227-
spec=spec,
223+
spec=expected_spec,
228224
)
229225

230-
mock_copy.assert_called_once_with("", "")
231-
mock_deployment_api.create.assert_called_once_with(deployment)
226+
mock_copy.assert_called_once_with(executor.job_dir, executor.lepton_job_dir)
227+
mock_job_api.create.assert_called_once_with(expected_job)
228+
mock_job_api.delete.assert_called_once_with(mock_job_api.create.return_value.metadata.id_)
232229

233230
def test_node_group_id(self):
234231
mock_client = MagicMock(

0 commit comments

Comments
 (0)