diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index a0db6751d..4cf063e81 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -365,23 +365,26 @@ def _query_requeue_dead_jobs(self): ELSE exc_info END) WHERE - id in ( - SELECT - queue_job_id - FROM - queue_job_lock - WHERE - queue_job_id in ( - SELECT - id - FROM - queue_job - WHERE - state IN ('enqueued','started') - AND date_enqueued < - (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') - ) - FOR UPDATE SKIP LOCKED + state IN ('enqueued','started') + AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') + AND ( + id in ( + SELECT + queue_job_id + FROM + queue_job_lock + WHERE + queue_job_lock.queue_job_id = queue_job.id + FOR UPDATE SKIP LOCKED + ) + OR NOT EXISTS ( + SELECT + 1 + FROM + queue_job_lock + WHERE + queue_job_lock.queue_job_id = queue_job.id + ) ) RETURNING uuid """ @@ -404,6 +407,12 @@ def requeue_dead_jobs(self): However, when the Odoo server crashes or is otherwise force-stopped, running jobs are interrupted while the runner has no chance to know they have been aborted. + + This also handles orphaned jobs (enqueued but never started, no lock). + This edge case occurs when the runner marks a job as 'enqueued' + but the HTTP request to start the job never reaches the Odoo server + (e.g., due to server shutdown/crash between setting enqueued and + the controller receiving the request). """ with closing(self.conn.cursor()) as cr: diff --git a/queue_job/tests/test_requeue_dead_job.py b/queue_job/tests/test_requeue_dead_job.py index c6c82a2f4..180e1294e 100644 --- a/queue_job/tests/test_requeue_dead_job.py +++ b/queue_job/tests/test_requeue_dead_job.py @@ -131,3 +131,28 @@ def test_requeue_dead_jobs(self): # because we committed the cursor, the savepoint of the test method is # gone, and this would break TransactionCase cleanups self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) + + def test_requeue_orphaned_jobs(self): + uuid = "test_enqueued_job" + queue_job = self.create_dummy_job(uuid) + job_obj = Job.load(self.env, queue_job.uuid) + + # Only enqueued job, don't set it to started to simulate the scenario + # that system shutdown before job is starting + job_obj.set_enqueued() + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.store() + + # job is now picked up by the requeue query (which includes orphaned jobs) + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + uuids_requeued = self.env.cr.fetchall() + self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued) + + # clean up + queue_job.unlink() + self.env.cr.commit() # pylint: disable=E8102 + + # because we committed the cursor, the savepoint of the test method is + # gone, and this would break TransactionCase cleanups + self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id)