Skip to content

Commit c855d63

Browse files
author
prekshivyas
committed
refactor data mover: switch to BatchJob with auto cleanup and sleep after every run
Signed-off-by: prekshivyas <[email protected]>
1 parent 5cfcd7c commit c855d63

File tree

1 file changed

+44
-19
lines changed

1 file changed

+44
-19
lines changed

nemo_run/core/execution/lepton.py

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,12 @@
1313
from invoke.context import Context
1414
from leptonai.api.v2.client import APIClient
1515
from leptonai.api.v1.types.affinity import LeptonResourceAffinity
16-
from leptonai.api.v1.types.common import LeptonVisibility, Metadata
16+
from leptonai.api.v1.types.common import Metadata, LeptonVisibility
1717
from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup
1818
from leptonai.api.v1.types.deployment import (
1919
EnvVar,
2020
LeptonContainer,
21-
LeptonDeployment,
22-
LeptonDeploymentUserSpec,
2321
Mount,
24-
ResourceRequirement,
2522
)
2623
from leptonai.api.v1.types.job import LeptonJob, LeptonJobState, LeptonJobUserSpec
2724
from leptonai.api.v1.types.replica import Replica
@@ -85,7 +82,7 @@ def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> Li
8582
full_command = ["sh", "-c", cmd]
8683
return full_command
8784

88-
def move_data(self, sleep: float = 10) -> None:
85+
def move_data(self, sleep: float = 10, timeout: int = 600, poll_interval: int = 5) -> None:
8986
"""
9087
Moves job directory into remote storage and deletes the workload after completion.
9188
"""
@@ -94,34 +91,62 @@ def move_data(self, sleep: float = 10) -> None:
9491
node_group_id = self._node_group_id(client)
9592
valid_node_ids = self._valid_node_ids(node_group_id, client)
9693

97-
spec = LeptonDeploymentUserSpec(
98-
container=LeptonContainer(
99-
image="busybox:1.37.0", # Use a very low resource container
100-
command=cmd,
101-
),
102-
mounts=[Mount(**mount) for mount in self.mounts],
103-
)
104-
spec.resource_requirement = ResourceRequirement(
94+
job_spec = LeptonJobUserSpec(
10595
resource_shape="cpu.small",
10696
affinity=LeptonResourceAffinity(
10797
allowed_dedicated_node_groups=[node_group_id.metadata.id_],
10898
allowed_nodes_in_node_group=valid_node_ids,
10999
),
110-
min_replicas=1,
111-
max_replicas=1,
100+
container=LeptonContainer(
101+
image="busybox:1.37.0",
102+
command=cmd,
103+
),
104+
completions=1,
105+
parallelism=1,
106+
mounts=[Mount(**mount) for mount in self.mounts],
107+
ttl_seconds_after_finished=600,
112108
)
109+
113110
custom_name = f"data-mover-{int(datetime.now().timestamp())}"
114111

115-
deployment = LeptonDeployment(
112+
job = LeptonJob(
116113
metadata=Metadata(
117114
id=custom_name,
118115
name=custom_name,
119116
visibility=LeptonVisibility("private"),
120117
),
121-
spec=spec,
118+
spec=job_spec,
122119
)
123120

124-
client.deployment.create(deployment)
121+
response = client.job.create(job)
122+
job_id = response.metadata.id_
123+
124+
start_time = time.time()
125+
count = 0
126+
127+
while True:
128+
if time.time() - start_time > timeout:
129+
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds.")
130+
current_job = client.job.get(job_id)
131+
current_job_status = current_job.status.state
132+
if count > 0 and current_job_status in [LeptonJobState.Completed, LeptonJobState.Failed, LeptonJobState.Unknown]:
133+
break
134+
count+=1
135+
time.sleep(poll_interval)
136+
137+
if current_job_status != LeptonJobState.Completed:
138+
raise RuntimeError(f"Job {job_id} failed with status: {current_job_status}")
139+
140+
time.sleep(sleep)
141+
142+
delete_success = client.job.delete(job_id)
143+
144+
if delete_success:
145+
logging.info(f"Successfully deleted job {job_id}")
146+
else:
147+
logging.error(f"Failed to delete job {job_id}")
148+
149+
125150

126151
def _node_group_id(self, client: APIClient) -> DedicatedNodeGroup:
127152
"""
@@ -231,7 +256,7 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]:
231256
f.write(launch_script)
232257

233258
logger.info("Copying experiment directory to remote filesystem")
234-
self.move_data()
259+
self.move_data(sleep=60)
235260

236261
logger.info("Creating distributed workload")
237262
job = self.create_lepton_job(name)

0 commit comments

Comments
 (0)