diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index a0dce3b8e9..28f3534848 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -27,15 +27,48 @@ class RunJobController(http.Controller): - def _try_perform_job(self, env, job): - """Try to perform the job.""" + @classmethod + def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: + """Acquire a job for execution. + + - make sure it is in ENQUEUED state + - mark it as STARTED and commit the state change + - acquire the job lock + + If successful, return the Job instance, otherwise return None. This + function may fail to acquire the job is not in the expected state or is + already locked by another worker. + """ + env.cr.execute( + "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s " + "FOR UPDATE SKIP LOCKED", + (job_uuid, ENQUEUED), + ) + if not env.cr.fetchone(): + _logger.warning( + "was requested to run job %s, but it does not exist, " + "or is not in state %s, or is being handled by another worker", + job_uuid, + ENQUEUED, + ) + return None + job = Job.load(env, job_uuid) + assert job and job.state == ENQUEUED job.set_started() job.store() env.cr.commit() - job.lock() + if not job.lock(): + _logger.warning( + "was requested to run job %s, but it could not be locked", + job_uuid, + ) + return None + return job + @classmethod + def _try_perform_job(cls, env, job): + """Try to perform the job, mark it done and commit if successful.""" _logger.debug("%s started", job) - job.perform() # Triggers any stored computed fields before calling 'set_done' # so that will be part of the 'exec_time' @@ -46,7 +79,8 @@ def _try_perform_job(self, env, job): env.cr.commit() _logger.debug("%s done", job) - def _enqueue_dependent_jobs(self, env, job): + @classmethod + def _enqueue_dependent_jobs(cls, env, job): tries = 0 while True: try: @@ -76,17 +110,8 @@ def _enqueue_dependent_jobs(self, env, job): else: break - @http.route( - "/queue_job/runjob", - type="http", - auth="none", - save_session=False, - readonly=False, - ) - def runjob(self, db, job_uuid, **kw): - http.request.session.db = db - env = http.request.env(user=SUPERUSER_ID) - + @classmethod + def _runjob(cls, env: api.Environment, job: Job) -> None: def retry_postpone(job, message, seconds=None): job.env.clear() with Registry(job.env.cr.dbname).cursor() as new_cr: @@ -95,26 +120,9 @@ def retry_postpone(job, message, seconds=None): job.set_pending(reset_retry=False) job.store() - # ensure the job to run is in the correct state and lock the record - env.cr.execute( - "SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE", - (job_uuid, ENQUEUED), - ) - if not env.cr.fetchone(): - _logger.warning( - "was requested to run job %s, but it does not exist, " - "or is not in state %s", - job_uuid, - ENQUEUED, - ) - return "" - - job = Job.load(env, job_uuid) - assert job and job.state == ENQUEUED - try: try: - self._try_perform_job(env, job) + cls._try_perform_job(env, job) except OperationalError as err: # Automatically retry the typical transaction serialization # errors @@ -132,7 +140,6 @@ def retry_postpone(job, message, seconds=None): # traceback in the logs we should have the traceback when all # retries are exhausted env.cr.rollback() - return "" except (FailedJobError, Exception) as orig_exception: buff = StringIO() @@ -142,19 +149,18 @@ def retry_postpone(job, message, seconds=None): job.env.clear() with Registry(job.env.cr.dbname).cursor() as new_cr: job.env = job.env(cr=new_cr) - vals = self._get_failure_values(job, traceback_txt, orig_exception) + vals = cls._get_failure_values(job, traceback_txt, orig_exception) job.set_failed(**vals) job.store() buff.close() raise _logger.debug("%s enqueue depends started", job) - self._enqueue_dependent_jobs(env, job) + cls._enqueue_dependent_jobs(env, job) _logger.debug("%s enqueue depends done", job) - return "" - - def _get_failure_values(self, job, traceback_txt, orig_exception): + @classmethod + def _get_failure_values(cls, job, traceback_txt, orig_exception): """Collect relevant data from exception.""" exception_name = orig_exception.__class__.__name__ if hasattr(orig_exception, "__module__"): @@ -168,6 +174,22 @@ def _get_failure_values(self, job, traceback_txt, orig_exception): "exc_message": exc_message, } + @http.route( + "/queue_job/runjob", + type="http", + auth="none", + save_session=False, + readonly=False, + ) + def runjob(self, db, job_uuid, **kw): + http.request.session.db = db + env = http.request.env(user=SUPERUSER_ID) + job = self._acquire_job(env, job_uuid) + if not job: + return "" + self._runjob(env, job) + return "" + # flake8: noqa: C901 @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( diff --git a/queue_job/job.py b/queue_job/job.py index 6cfe12f232..4c78072508 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -221,7 +221,7 @@ def load_many(cls, env, job_uuids): recordset = cls.db_records_from_uuids(env, job_uuids) return {cls._load_from_db_record(record) for record in recordset} - def add_lock_record(self): + def add_lock_record(self) -> None: """ Create row in db to be locked while the job is being performed. """ @@ -241,13 +241,11 @@ def add_lock_record(self): [self.uuid], ) - def lock(self): - """ - Lock row of job that is being performed + def lock(self) -> bool: + """Lock row of job that is being performed. - If a job cannot be locked, - it means that the job wasn't started, - a RetryableJobError is thrown. + Return False if a job cannot be locked: it means that the job is not in + STARTED state or is already locked by another worker. """ self.env.cr.execute( """ @@ -263,18 +261,15 @@ def lock(self): queue_job WHERE uuid = %s - AND state='started' + AND state = %s ) - FOR UPDATE; + FOR UPDATE SKIP LOCKED; """, - [self.uuid], + [self.uuid, STARTED], ) # 1 job should be locked - if 1 != len(self.env.cr.fetchall()): - raise RetryableJobError( - f"Trying to lock job that wasn't started, uuid: {self.uuid}" - ) + return bool(self.env.cr.fetchall()) @classmethod def _load_from_db_record(cls, job_db_record): diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index 62347148e5..0cfacebdf3 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -1,3 +1,4 @@ +from . import test_acquire_job from . import test_autovacuum from . import test_delayable from . import test_dependencies diff --git a/test_queue_job/tests/common.py b/test_queue_job/tests/common.py index 335c072625..d3173a2198 100644 --- a/test_queue_job/tests/common.py +++ b/test_queue_job/tests/common.py @@ -20,3 +20,13 @@ def _create_job(self): stored = Job.db_records_from_uuids(self.env, [test_job.uuid]) self.assertEqual(len(stored), 1) return stored + + def _get_demo_job(self, uuid): + # job created during load of demo data + job = self.env["queue.job"].search([("uuid", "=", uuid)], limit=1) + self.assertTrue( + job, + f"Demo data queue job {uuid!r} should be loaded in order " + "to make this test work", + ) + return job diff --git a/test_queue_job/tests/test_acquire_job.py b/test_queue_job/tests/test_acquire_job.py new file mode 100644 index 0000000000..3f0c92a2be --- /dev/null +++ b/test_queue_job/tests/test_acquire_job.py @@ -0,0 +1,51 @@ +# Copyright 2026 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +import logging +from unittest import mock + +from odoo.tests import tagged + +from odoo.addons.queue_job.controllers.main import RunJobController + +from .common import JobCommonCase + + +@tagged("post_install", "-at_install") +class TestRequeueDeadJob(JobCommonCase): + def test_acquire_enqueued_job(self): + job_record = self._get_demo_job(uuid="test_enqueued_job") + self.assertFalse( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)], + ), + "A job lock record should not exist at this point", + ) + with mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit: + job = RunJobController._acquire_job(self.env, job_uuid="test_enqueued_job") + mock_commit.assert_called_once() + self.assertIsNotNone(job) + self.assertEqual(job.uuid, "test_enqueued_job") + self.assertEqual(job.state, "started") + self.assertTrue( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)] + ), + "A job lock record should exist at this point", + ) + + def test_acquire_started_job(self): + with ( + mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit, + self.assertLogs(level=logging.WARNING) as logs, + ): + job = RunJobController._acquire_job(self.env, "test_started_job") + mock_commit.assert_not_called() + self.assertIsNone(job) + self.assertIn( + "was requested to run job test_started_job, but it does not exist", + logs.output[0], + ) diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py index 58890adf24..510276be63 100644 --- a/test_queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -13,23 +13,6 @@ @tagged("post_install", "-at_install") class TestRequeueDeadJob(JobCommonCase): - def _get_demo_job(self, uuid): - # job created during load of demo data - job = self.env["queue.job"].search( - [ - ("uuid", "=", uuid), - ], - limit=1, - ) - - self.assertTrue( - job, - f"Demo data queue job {uuid} should be loaded in order" - " to make this tests work", - ) - - return job - def get_locks(self, uuid, cr=None): """ Retrieve lock rows