diff --git a/services/worker/src/worker/executor.py b/services/worker/src/worker/executor.py index ae7982337..5ab9cb6c2 100644 --- a/services/worker/src/worker/executor.py +++ b/services/worker/src/worker/executor.py @@ -146,12 +146,27 @@ def heartbeat(self) -> None: worker_state = self.get_state() if worker_state and worker_state["current_job_info"]: job_id = worker_state["current_job_info"]["job_id"] - try: - Queue().heartbeat(job_id=job_id) - except Exception as error: - logging.warning(f"Heartbeat failed for job {job_id} (AfterJobPlan might be running): {error}") - # Don't stop since the AfterJobPlan may be running - # self.stop() + max_retries = 6 + delay = 0.5 # seconds + + for attempt in range(max_retries): + try: + Queue().heartbeat(job_id=job_id) + return # success + except Exception as error: + if "JobDocument matching query does not exist" in str(error): + if attempt < max_retries - 1: + logging.warning( + f"Heartbeat retry {attempt + 1}/{max_retries} for job {job_id} - waiting {delay}s: {error}" + ) + time.sleep(delay) + else: + logging.error(f"Heartbeat failed after retries for job {job_id}: {error}") + else: + # Unrelated error, raise immediately + raise + + def kill_zombies(self) -> None: queue = Queue()