@@ -127,41 +127,39 @@ def move_data(
127127 job_id = response .metadata .id_
128128
129129 start_time = time .time ()
130- unknown_start_time = None
131- count = 0
132130
133131 while True :
134132 if time .time () - start_time > timeout :
135133 raise TimeoutError (f"Job { job_id } did not complete within { timeout } seconds." )
136134
137135 current_job = client .job .get (job_id )
138136 current_job_status = current_job .status .state
139- if count > 0 :
140- if (
141- current_job_status == LeptonJobState .Completed
142- or current_job_status == LeptonJobState .Failed
143- ):
144- break
145- elif current_job_status == LeptonJobState .Unknown :
146- if unknown_start_time is None :
147- unknown_start_time = time .time ()
148- logging .warning (
149- f"Job { job_id } entered Unknown state, giving it { unknowns_grace_period } seconds to recover..."
150- )
151-
152- elif time .time () - unknown_start_time > unknowns_grace_period :
153- logging .error (
154- f"Job { job_id } has been in Unknown state for more than { unknowns_grace_period } seconds"
155- )
156- break
157- else :
158- if unknown_start_time is not None :
137+ if (
138+ current_job_status == LeptonJobState .Completed
139+ or current_job_status == LeptonJobState .Failed
140+ ):
141+ break
142+ elif current_job_status == LeptonJobState .Unknown :
143+ logging .warning (
144+ f"Job { job_id } entered Unknown state, checking for up to { unknowns_grace_period } seconds every 2 seconds..."
145+ )
146+ unknown_start_time = time .time ()
147+ recovered = False
148+ while time .time () - unknown_start_time < unknowns_grace_period :
149+ time .sleep (2 )
150+ current_job = client .job .get (job_id )
151+ current_job_status = current_job .status .state
152+ if current_job_status != LeptonJobState .Unknown :
159153 logging .info (
160154 f"Job { job_id } recovered from Unknown state to { current_job_status } "
161155 )
162- unknown_start_time = None
163-
164- count += 1
156+ recovered = True
157+ break
158+ if not recovered :
159+ logging .error (
160+ f"Job { job_id } has been in Unknown state for more than { unknowns_grace_period } seconds"
161+ )
162+ break
165163 time .sleep (poll_interval )
166164
167165 if current_job_status != LeptonJobState .Completed :
0 commit comments