Skip to content

Commit a427d71

Browse files
authored
[Core] Improve reliability of job monitor (#1441)
1 parent e581eb6 commit a427d71

File tree

3 files changed

+41
-18
lines changed

3 files changed

+41
-18
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
- [IBM CE] Sanitize user_key in IBM CE to be RFC 1233 compliant
1515
- [CLI] Fix storage list error
1616
- [K8s] Fixed bug with first execution of K8s and Singularity
17+
- [Core] Prevent job monitor from stopping abruptly on iteration error causing hanging jobs
18+
1719

1820
## [v3.6.0]
1921

lithops/monitor.py

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ def _all_ready(self):
9090
"""
9191
Checks if all futures are ready, success or done
9292
"""
93-
return all([f.ready or f.success or f.done for f in self.futures])
93+
try:
94+
return all(f.ready or f.success or f.done for f in self.futures)
95+
except Exception:
96+
return False
9497

9598
def _check_new_futures(self, call_status, f):
9699
"""Checks if a functions returned new futures to track"""
@@ -426,41 +429,54 @@ def _generate_tokens(self, callids_running, callids_done):
426429
self.callids_running_processed.update(callids_running_to_process)
427430
self.callids_done_processed.update(callids_done_to_process)
428431

432+
def _poll_and_process_job_status(self, previous_log, log_time):
433+
"""
434+
Polls the storage backend for job status, updates futures,
435+
and prints status logs.
436+
437+
Returns:
438+
new_callids_done (set): New callids that were marked as done.
439+
previous_log (str): Updated log message.
440+
log_time (float): Updated log time counter.
441+
"""
442+
callids_running, callids_done = self.internal_storage.get_job_status(self.executor_id)
443+
new_callids_done = callids_done - self.callids_done_processed_status
444+
445+
self._generate_tokens(callids_running, callids_done)
446+
self._tag_future_as_running(callids_running)
447+
self._tag_future_as_ready(callids_done)
448+
449+
previous_log, log_time = self._print_status_log(previous_log, log_time)
450+
451+
return new_callids_done, previous_log, log_time
452+
429453
def run(self):
430454
"""
431-
Run method
455+
Run method for the Storage job monitor thread.
432456
"""
433457
logger.debug(f'ExecutorID {self.executor_id} - Starting Storage job monitor')
434458

435459
wait_dur_sec = self.monitoring_interval
436460
previous_log = None
437461
log_time = 0
438462

439-
def process_callids():
440-
nonlocal previous_log, log_time
441-
callids_running, callids_done = self.internal_storage.get_job_status(self.executor_id)
442-
# verify if there are new callids_done and reduce the sleep
443-
new_callids_done = callids_done - self.callids_done_processed_status
444-
# generate tokens and mark futures as running/done
445-
self._generate_tokens(callids_running, callids_done)
446-
self._tag_future_as_running(callids_running)
447-
self._tag_future_as_ready(callids_done)
448-
previous_log, log_time = self._print_status_log(previous_log, log_time)
449-
450-
return new_callids_done
451-
452463
while not self._all_ready():
453464
time.sleep(wait_dur_sec)
454465
wait_dur_sec = self.monitoring_interval
455466
log_time += wait_dur_sec
456467

457468
if not self.should_run:
469+
logger.debug(f'ExecutorID {self.executor_id} - Monitor stopped externally')
458470
break
459471

460-
if len(process_callids()) > 0:
461-
wait_dur_sec = self.monitoring_interval / 5
472+
try:
473+
new_callids_done, previous_log, log_time = self._poll_and_process_job_status(previous_log, log_time)
474+
if new_callids_done:
475+
wait_dur_sec = self.monitoring_interval / 5
476+
except Exception as e:
477+
logger.error(f'ExecutorID {self.executor_id} - Error during monitor: {e}', exc_info=True)
462478

463-
process_callids()
479+
self._poll_and_process_job_status(previous_log, log_time)
464480

465481
logger.debug(f'ExecutorID {self.executor_id} - Storage job monitor finished')
466482

@@ -509,6 +525,9 @@ def start(self, fs, job_id=None, chunksize=None, generate_tokens=False):
509525
if not self.monitor.is_alive():
510526
self.monitor.start()
511527

528+
def is_alive(self):
529+
self.monitor.is_alive()
530+
512531
def remove(self, fs):
513532
if self.monitor and self.monitor.is_alive():
514533
self.monitor.remove_futures(fs)

lithops/wait.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]],
142142
threadpool_size=threadpool_size)
143143
else:
144144
while not _check_done(fs, return_when, download_results):
145+
if not job_monitor.is_alive():
146+
job_monitor.start(fs=fs)
145147
for executor_data in executors_data:
146148
new_data = _get_executor_data(fs, executor_data, pbar=pbar,
147149
throw_except=throw_except,

0 commit comments

Comments
 (0)