Skip to content

Commit 19f7cab

Browse files
prekshivyasprekshivyas
andauthored
add a grace for Jobs that may start in Unknown (#291)
* add a grace for Jobs that may start in Unknown Signed-off-by: Prekshi Vyas <[email protected]> * add a grace for Jobs that may start in Unknown Signed-off-by: Prekshi Vyas <[email protected]> * add a grace for Jobs that may start in Unknown Signed-off-by: Prekshi Vyas <[email protected]> * fix linting Signed-off-by: Prekshi Vyas <[email protected]> * make the handling of Unknown job status better by polling Signed-off-by: prekshivyas <[email protected]> --------- Signed-off-by: Prekshi Vyas <[email protected]> Signed-off-by: prekshivyas <[email protected]> Co-authored-by: prekshivyas <[email protected]>
1 parent f00ef04 commit 19f7cab

File tree

1 file changed

+33
-8
lines changed

1 file changed

+33
-8
lines changed

nemo_run/core/execution/lepton.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,13 @@ def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> Li
8383
full_command = ["sh", "-c", cmd]
8484
return full_command
8585

86-
def move_data(self, sleep: float = 10, timeout: int = 600, poll_interval: int = 5) -> None:
86+
def move_data(
87+
self,
88+
sleep: float = 10,
89+
timeout: int = 600,
90+
poll_interval: int = 5,
91+
unknowns_grace_period: int = 60,
92+
) -> None:
8793
"""
8894
Moves job directory into remote storage and deletes the workload after completion.
8995
"""
@@ -122,20 +128,39 @@ def move_data(self, sleep: float = 10, timeout: int = 600, poll_interval: int =
122128
job_id = response.metadata.id_
123129

124130
start_time = time.time()
125-
count = 0
126131

127132
while True:
128133
if time.time() - start_time > timeout:
129134
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds.")
135+
130136
current_job = client.job.get(job_id)
131137
current_job_status = current_job.status.state
132-
if count > 0 and current_job_status in [
133-
LeptonJobState.Completed,
134-
LeptonJobState.Failed,
135-
LeptonJobState.Unknown,
136-
]:
138+
if (
139+
current_job_status == LeptonJobState.Completed
140+
or current_job_status == LeptonJobState.Failed
141+
):
137142
break
138-
count += 1
143+
elif current_job_status == LeptonJobState.Unknown:
144+
logging.warning(
145+
f"Job {job_id} entered Unknown state, checking for up to {unknowns_grace_period} seconds every 2 seconds..."
146+
)
147+
unknown_start_time = time.time()
148+
recovered = False
149+
while time.time() - unknown_start_time < unknowns_grace_period:
150+
time.sleep(2)
151+
current_job = client.job.get(job_id)
152+
current_job_status = current_job.status.state
153+
if current_job_status != LeptonJobState.Unknown:
154+
logging.info(
155+
f"Job {job_id} recovered from Unknown state to {current_job_status}"
156+
)
157+
recovered = True
158+
break
159+
if not recovered:
160+
logging.error(
161+
f"Job {job_id} has been in Unknown state for more than {unknowns_grace_period} seconds"
162+
)
163+
break
139164
time.sleep(poll_interval)
140165

141166
if current_job_status != LeptonJobState.Completed:

0 commit comments

Comments
 (0)