diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py
index 978356cfd7..823e370a7c 100644
--- a/queue_job/__manifest__.py
+++ b/queue_job/__manifest__.py
@@ -20,6 +20,7 @@
"wizards/queue_requeue_job_views.xml",
"views/queue_job_menus.xml",
"data/queue_data.xml",
+ "data/queue_job_executor_cron.xml",
"data/queue_job_function_data.xml",
],
"assets": {
diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py
index 28f3534848..17bad056b4 100644
--- a/queue_job/controllers/main.py
+++ b/queue_job/controllers/main.py
@@ -28,7 +28,9 @@
class RunJobController(http.Controller):
@classmethod
- def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None:
+ def _acquire_job(
+ cls, env: api.Environment, job_uuid: str | None = None
+ ) -> Job | None:
"""Acquire a job for execution.
- make sure it is in ENQUEUED state
@@ -38,30 +40,32 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None:
If successful, return the Job instance, otherwise return None. This
function may fail to acquire the job is not in the expected state or is
already locked by another worker.
+
+ If no job_uuid is given, acquire any available job in ENQUEUED state.
"""
- env.cr.execute(
- "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s "
- "FOR UPDATE SKIP LOCKED",
- (job_uuid, ENQUEUED),
- )
- if not env.cr.fetchone():
- _logger.warning(
- "was requested to run job %s, but it does not exist, "
- "or is not in state %s, or is being handled by another worker",
- job_uuid,
- ENQUEUED,
+ if job_uuid:
+ env.cr.execute(
+ "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s "
+ "FOR UPDATE SKIP LOCKED",
+ (job_uuid, ENQUEUED),
+ )
+ else:
+ env.cr.execute(
+ "SELECT uuid FROM queue_job WHERE state=%s LIMIT 1 "
+ "FOR UPDATE SKIP LOCKED",
+ (ENQUEUED,),
)
+ job_row = env.cr.fetchone()
+ if not job_row:
+ _logger.debug("no job to run")
return None
- job = Job.load(env, job_uuid)
+ job = Job.load(env, job_uuid=job_row[0])
assert job and job.state == ENQUEUED
job.set_started()
job.store()
env.cr.commit()
if not job.lock():
- _logger.warning(
- "was requested to run job %s, but it could not be locked",
- job_uuid,
- )
+ _logger.debug("could not acquire lock for job %s", job.uuid)
return None
return job
@@ -157,6 +161,7 @@ def retry_postpone(job, message, seconds=None):
_logger.debug("%s enqueue depends started", job)
cls._enqueue_dependent_jobs(env, job)
+ env.cr.commit()
_logger.debug("%s enqueue depends done", job)
@classmethod
@@ -181,13 +186,26 @@ def _get_failure_values(cls, job, traceback_txt, orig_exception):
save_session=False,
readonly=False,
)
- def runjob(self, db, job_uuid, **kw):
+ def runjob(self, db: str, job_uuid: str | None, **kw):
http.request.session.db = db
env = http.request.env(user=SUPERUSER_ID)
- job = self._acquire_job(env, job_uuid)
- if not job:
- return ""
- self._runjob(env, job)
+ run_as = env["ir.config_parameter"].get_param("queue_job.run_as")
+ if run_as == "cron":
+ crons = env["ir.cron"].search(
+ env["queue.job.executor"]._executor_cron_domain()
+ )
+ assert crons, "No queue_job executor cron found"
+ for cron in crons:
+ # TODO Awaking all of them is a bit wasteful although not very
+ # costly. Ideally we should awaken only one that is not already
+ # running.
+ cron._trigger()
+ else:
+ # Run in this http worker
+ job = self._acquire_job(env, job_uuid)
+ if not job:
+ return ""
+ self._runjob(env, job)
return ""
# flake8: noqa: C901
diff --git a/queue_job/data/queue_job_executor_cron.xml b/queue_job/data/queue_job_executor_cron.xml
new file mode 100644
index 0000000000..5439b6f051
--- /dev/null
+++ b/queue_job/data/queue_job_executor_cron.xml
@@ -0,0 +1,11 @@
+
+
+
+ Queue Job Executor
+
+ code
+ model._execute_ready_jobs()
+ 1
+ hours
+
+
diff --git a/queue_job/models/__init__.py b/queue_job/models/__init__.py
index 6265dfe9cb..781236b064 100644
--- a/queue_job/models/__init__.py
+++ b/queue_job/models/__init__.py
@@ -2,5 +2,6 @@
from . import ir_model_fields
from . import queue_job
from . import queue_job_channel
+from . import queue_job_executor
from . import queue_job_function
from . import queue_job_lock
diff --git a/queue_job/models/queue_job_executor.py b/queue_job/models/queue_job_executor.py
new file mode 100644
index 0000000000..9e7923558c
--- /dev/null
+++ b/queue_job/models/queue_job_executor.py
@@ -0,0 +1,60 @@
+# Copyright (c) 2026 ACSONE SA/NV ()
+# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
+
+import logging
+
+from odoo import api, models
+
+from ..controllers.main import RunJobController
+from ..job import Job
+
+_logger = logging.getLogger(__name__)
+
+
+class QueueJobExecutor(models.AbstractModel):
+ _name = "queue.job.executor"
+ _description = "Queue Job Executor"
+
+ @api.model
+ def _executor_cron_domain(self) -> list:
+ model_id = self.env["ir.model"]._get("queue.job.executor").id
+ return [
+ ("model_id", "=", model_id),
+ ("state", "=", "code"),
+ ("code", "=", "model._execute_ready_jobs()"),
+ ]
+
+ @api.model
+ def _ensure_executor_crons(self, capacity: int) -> None:
+ """Since Odoo cron can't run cron jobs in parallel, we create several.
+
+ `capacity` should be equal to the root channel capacity. If it's more,
+ it's wasteful. If it's less, job will stay in ENQUEUED state longer than
+ needed and loop back to PENDING due to the dead jobs requeuer.
+ """
+ if capacity < 1:
+ return
+ ref_cron = self.env.ref("queue_job.queue_job_executor_cron")
+ ref_cron.active = True
+ # remove clones
+ self.env["ir.cron"].with_context(active_test=False).search(
+ self._executor_cron_domain() + [("id", "!=", ref_cron.id)]
+ ).unlink()
+ # re-create desired number of clones
+ for _i in range(1, capacity):
+ ref_cron.copy()
+
+ @api.model
+ def _enable_executor_cron(self, capacity: int) -> None:
+ self._ensure_executor_crons(capacity)
+ self.env["ir.config_parameter"].set_param("queue_job.run_as", "cron")
+
+ @api.model
+ def _execute_job(self, job: Job) -> None:
+ RunJobController._runjob(self.env, job)
+
+ @api.model
+ def _execute_ready_jobs(self) -> None:
+ while job := RunJobController._acquire_job(self.env):
+ _logger.debug("executor cron running queue job %s", job.uuid)
+ self._execute_job(job)
diff --git a/test_queue_job/tests/test_acquire_job.py b/test_queue_job/tests/test_acquire_job.py
index 3f0c92a2be..534f8e3a1a 100644
--- a/test_queue_job/tests/test_acquire_job.py
+++ b/test_queue_job/tests/test_acquire_job.py
@@ -1,11 +1,11 @@
# Copyright 2026 ACSONE SA/NV
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
-import logging
from unittest import mock
from odoo.tests import tagged
from odoo.addons.queue_job.controllers.main import RunJobController
+from odoo.addons.queue_job.job import ENQUEUED, STARTED
from .common import JobCommonCase
@@ -27,7 +27,7 @@ def test_acquire_enqueued_job(self):
mock_commit.assert_called_once()
self.assertIsNotNone(job)
self.assertEqual(job.uuid, "test_enqueued_job")
- self.assertEqual(job.state, "started")
+ self.assertEqual(job.state, STARTED)
self.assertTrue(
self.env["queue.job.lock"].search(
[("queue_job_id", "=", job_record.id)]
@@ -35,17 +35,29 @@ def test_acquire_enqueued_job(self):
"A job lock record should exist at this point",
)
+ def test_acquire_any_enqueued_job(self):
+ available_job_uuids = (
+ self.env["queue.job"].search([("state", "=", ENQUEUED)]).mapped("uuid")
+ )
+ with mock.patch.object(
+ self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all)
+ ) as mock_commit:
+ job = RunJobController._acquire_job(self.env)
+ mock_commit.assert_called_once()
+ self.assertIsNotNone(job)
+ self.assertIn(job.uuid, available_job_uuids)
+ self.assertEqual(job.state, STARTED)
+ self.assertTrue(
+ self.env["queue.job.lock"].search(
+ [("queue_job_id", "=", job.db_record().id)]
+ ),
+ "A job lock record should exist at this point",
+ )
+
def test_acquire_started_job(self):
- with (
- mock.patch.object(
- self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all)
- ) as mock_commit,
- self.assertLogs(level=logging.WARNING) as logs,
- ):
+ with mock.patch.object(
+ self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all)
+ ) as mock_commit:
job = RunJobController._acquire_job(self.env, "test_started_job")
mock_commit.assert_not_called()
self.assertIsNone(job)
- self.assertIn(
- "was requested to run job test_started_job, but it does not exist",
- logs.output[0],
- )