Skip to content

Commit a23895f

Browse files
committed
Merge PR #794 into 16.0
Signed-off-by sbidoul
2 parents b68be0e + bd48ae6 commit a23895f

File tree

10 files changed

+170
-102
lines changed

10 files changed

+170
-102
lines changed

queue_job/README.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ a job, is transferred to the job according to an allow-list.
429429

430430
The default allow-list is `("tz", "lang", "allowed_company_ids", "force_company", "active_test")`. It can
431431
be customized in ``Base._job_prepare_context_before_enqueue_keys``.
432+
432433
**Bypass jobs on running Odoo**
433434

434435
When you are developing (ie: connector modules) you might want

queue_job/controllers/main.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,13 @@ def _enqueue_dependent_jobs(self, env, job):
7474
else:
7575
break
7676

77-
@http.route("/queue_job/runjob", type="http", auth="none", save_session=False)
77+
@http.route(
78+
"/queue_job/runjob",
79+
type="http",
80+
auth="none",
81+
save_session=False,
82+
readonly=False,
83+
)
7884
def runjob(self, db, job_uuid, **kw):
7985
http.request.session.db = db
8086
env = http.request.env(user=SUPERUSER_ID)

queue_job/job.py

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import uuid
1010
import weakref
1111
from datetime import datetime, timedelta
12-
from functools import total_ordering
1312
from random import randint
1413

1514
import odoo
@@ -104,7 +103,6 @@ def identity_exact_hasher(job_):
104103
return hasher
105104

106105

107-
@total_ordering
108106
class Job:
109107
"""A Job is a task to execute. It is the in-memory representation of a job.
110108
@@ -367,65 +365,6 @@ def job_record_with_same_identity_key(self):
367365
)
368366
return existing
369367

370-
# TODO to deprecate (not called anymore)
371-
@classmethod
372-
def enqueue(
373-
cls,
374-
func,
375-
args=None,
376-
kwargs=None,
377-
priority=None,
378-
eta=None,
379-
max_retries=None,
380-
description=None,
381-
channel=None,
382-
identity_key=None,
383-
):
384-
"""Create a Job and enqueue it in the queue. Return the job uuid.
385-
386-
This expects the arguments specific to the job to be already extracted
387-
from the ones to pass to the job function.
388-
389-
If the identity key is the same than the one in a pending job,
390-
no job is created and the existing job is returned
391-
392-
"""
393-
new_job = cls(
394-
func=func,
395-
args=args,
396-
kwargs=kwargs,
397-
priority=priority,
398-
eta=eta,
399-
max_retries=max_retries,
400-
description=description,
401-
channel=channel,
402-
identity_key=identity_key,
403-
)
404-
return new_job._enqueue_job()
405-
406-
# TODO to deprecate (not called anymore)
407-
def _enqueue_job(self):
408-
if self.identity_key:
409-
existing = self.job_record_with_same_identity_key()
410-
if existing:
411-
_logger.debug(
412-
"a job has not been enqueued due to having "
413-
"the same identity key (%s) than job %s",
414-
self.identity_key,
415-
existing.uuid,
416-
)
417-
return Job._load_from_db_record(existing)
418-
self.store()
419-
_logger.debug(
420-
"enqueued %s:%s(*%r, **%r) with uuid: %s",
421-
self.recordset,
422-
self.method_name,
423-
self.args,
424-
self.kwargs,
425-
self.uuid,
426-
)
427-
return self
428-
429368
@staticmethod
430369
def db_record_from_uuid(env, job_uuid):
431370
# TODO remove in 15.0 or 16.0
@@ -749,16 +688,6 @@ def __eq__(self, other):
749688
def __hash__(self):
750689
return self.uuid.__hash__()
751690

752-
def sorting_key(self):
753-
return self.eta, self.priority, self.date_created, self.seq
754-
755-
def __lt__(self, other):
756-
if self.eta and not other.eta:
757-
return True
758-
elif not self.eta and other.eta:
759-
return False
760-
return self.sorting_key() < other.sorting_key()
761-
762691
def db_record(self):
763692
return self.db_records_from_uuids(self.env, [self.uuid])
764693

queue_job/jobrunner/channels.py

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright 2015-2016 Camptocamp SA
33
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
44
import logging
5+
from collections import namedtuple
56
from functools import total_ordering
67
from heapq import heappop, heappush
78
from weakref import WeakValueDictionary
@@ -10,6 +11,7 @@
1011
from ..job import CANCELLED, DONE, ENQUEUED, FAILED, PENDING, STARTED, WAIT_DEPENDENCIES
1112

1213
NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED)
14+
JobSortingKey = namedtuple("SortingKey", "eta priority date_created seq")
1315

1416
_logger = logging.getLogger(__name__)
1517

@@ -108,7 +110,7 @@ class ChannelJob:
108110
job that are necessary to prioritise them.
109111
110112
Channel jobs are comparable according to the following rules:
111-
* jobs with an eta come before all other jobs
113+
* jobs with an eta cannot be compared with jobs without
112114
* then jobs with a smaller eta come first
113115
* then jobs with a smaller priority come first
114116
* then jobs with a smaller creation time come first
@@ -135,14 +137,18 @@ class ChannelJob:
135137
>>> j3 < j1
136138
True
137139
138-
j4 and j5 comes even before j3, because they have an eta
140+
j4 and j5 have an eta, they cannot be compared with j3
139141
140142
>>> j4 = ChannelJob(None, None, 4,
141143
... seq=0, date_created=4, priority=9, eta=9)
142144
>>> j5 = ChannelJob(None, None, 5,
143145
... seq=0, date_created=5, priority=9, eta=9)
144-
>>> j4 < j5 < j3
146+
>>> j4 < j5
145147
True
148+
>>> j4 < j3
149+
Traceback (most recent call last):
150+
...
151+
TypeError: '<' not supported between instances of 'int' and 'NoneType'
146152
147153
j6 has same date_created and priority as j5 but a smaller eta
148154
@@ -153,7 +159,7 @@ class ChannelJob:
153159
154160
Here is the complete suite:
155161
156-
>>> j6 < j4 < j5 < j3 < j1 < j2
162+
>>> j6 < j4 < j5 and j3 < j1 < j2
157163
True
158164
159165
j0 has the same properties as j1 but they are not considered
@@ -173,14 +179,13 @@ class ChannelJob:
173179
174180
"""
175181

182+
__slots__ = ("db_name", "channel", "uuid", "_sorting_key", "__weakref__")
183+
176184
def __init__(self, db_name, channel, uuid, seq, date_created, priority, eta):
177185
self.db_name = db_name
178186
self.channel = channel
179187
self.uuid = uuid
180-
self.seq = seq
181-
self.date_created = date_created
182-
self.priority = priority
183-
self.eta = eta
188+
self._sorting_key = JobSortingKey(eta, priority, date_created, seq)
184189

185190
def __repr__(self):
186191
return "<ChannelJob %s>" % self.uuid
@@ -191,18 +196,36 @@ def __eq__(self, other):
191196
def __hash__(self):
192197
return id(self)
193198

199+
def set_no_eta(self):
200+
self._sorting_key = JobSortingKey(None, *self._sorting_key[1:])
201+
202+
@property
203+
def seq(self):
204+
return self._sorting_key.seq
205+
206+
@property
207+
def date_created(self):
208+
return self._sorting_key.date_created
209+
210+
@property
211+
def priority(self):
212+
return self._sorting_key.priority
213+
214+
@property
215+
def eta(self):
216+
return self._sorting_key.eta
217+
194218
def sorting_key(self):
195-
return self.eta, self.priority, self.date_created, self.seq
219+
# DEPRECATED
220+
return self._sorting_key
196221

197222
def sorting_key_ignoring_eta(self):
198-
return self.priority, self.date_created, self.seq
223+
return self._sorting_key[1:]
199224

200225
def __lt__(self, other):
201-
if self.eta and not other.eta:
202-
return True
203-
elif not self.eta and other.eta:
204-
return False
205-
return self.sorting_key() < other.sorting_key()
226+
# Do not compare job where ETA is set with job where it is not
227+
# If one job 'eta' is set, and the other is None, it raises TypeError
228+
return self._sorting_key < other._sorting_key
206229

207230

208231
class ChannelQueue:
@@ -312,7 +335,7 @@ def remove(self, job):
312335
def pop(self, now):
313336
while self._eta_queue and self._eta_queue[0].eta <= now:
314337
eta_job = self._eta_queue.pop()
315-
eta_job.eta = None
338+
eta_job.set_no_eta()
316339
self._queue.add(eta_job)
317340
if self.sequential and self._eta_queue and self._queue:
318341
eta_job = self._eta_queue[0]

queue_job/jobrunner/runner.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,17 @@ def __init__(
434434
self._stop = False
435435
self._stop_pipe = os.pipe()
436436

437+
def __del__(self):
438+
# pylint: disable=except-pass
439+
try:
440+
os.close(self._stop_pipe[0])
441+
except OSError:
442+
pass
443+
try:
444+
os.close(self._stop_pipe[1])
445+
except OSError:
446+
pass
447+
437448
@classmethod
438449
def from_environ_or_config(cls):
439450
scheme = os.environ.get("ODOO_QUEUE_JOB_SCHEME") or queue_job_config.get(

queue_job/models/queue_job.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from datetime import datetime, timedelta
77

88
from odoo import _, api, exceptions, fields, models
9-
from odoo.tools import config, html_escape
9+
from odoo.tools import config, html_escape, index_exists
1010

1111
from odoo.addons.base_sparse_field.models.fields import Serialized
1212

@@ -91,7 +91,7 @@ class QueueJob(models.Model):
9191
func_string = fields.Char(string="Task", readonly=True)
9292

9393
state = fields.Selection(STATES, readonly=True, required=True, index=True)
94-
priority = fields.Integer()
94+
priority = fields.Integer(group_operator=False)
9595
exc_name = fields.Char(string="Exception", readonly=True)
9696
exc_message = fields.Char(string="Exception Message", readonly=True, tracking=True)
9797
exc_info = fields.Text(string="Exception Info", readonly=True)
@@ -130,16 +130,21 @@ class QueueJob(models.Model):
130130
worker_pid = fields.Integer(readonly=True)
131131

132132
def init(self):
133-
self._cr.execute(
134-
"SELECT indexname FROM pg_indexes WHERE indexname = %s ",
135-
("queue_job_identity_key_state_partial_index",),
136-
)
137-
if not self._cr.fetchone():
133+
index_1 = "queue_job_identity_key_state_partial_index"
134+
index_2 = "queue_job_channel_date_done_date_created_index"
135+
if not index_exists(self._cr, index_1):
136+
# Used by Job.job_record_with_same_identity_key
138137
self._cr.execute(
139138
"CREATE INDEX queue_job_identity_key_state_partial_index "
140139
"ON queue_job (identity_key) WHERE state in ('pending', "
141140
"'enqueued', 'wait_dependencies') AND identity_key IS NOT NULL;"
142141
)
142+
if not index_exists(self._cr, index_2):
143+
# Used by <queue.job>.autovacuum
144+
self._cr.execute(
145+
"CREATE INDEX queue_job_channel_date_done_date_created_index "
146+
"ON queue_job (channel, date_done, date_created);"
147+
)
143148

144149
@api.depends("records")
145150
def _compute_record_ids(self):
@@ -408,6 +413,7 @@ def autovacuum(self):
408413
("date_cancelled", "<=", deadline),
409414
("channel", "=", channel.complete_name),
410415
],
416+
order="date_done, date_created",
411417
limit=1000,
412418
)
413419
if jobs:

queue_job/tests/common.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ def _add_job(self, *args, **kwargs):
256256
if not job.identity_key or all(
257257
j.identity_key != job.identity_key for j in self.enqueued_jobs
258258
):
259+
self._prepare_context(job)
259260
self.enqueued_jobs.append(job)
260261

261262
patcher = mock.patch.object(job, "store")
@@ -274,6 +275,13 @@ def _add_job(self, *args, **kwargs):
274275
)
275276
return job
276277

278+
def _prepare_context(self, job):
279+
# pylint: disable=context-overridden
280+
job_model = job.job_model.with_context({})
281+
field_records = job_model._fields["records"]
282+
# Filter the context to simulate store/load of the job
283+
job.recordset = field_records.convert_to_write(job.recordset, job_model)
284+
277285
def __enter__(self):
278286
return self
279287

queue_job/tests/test_runner_runner.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,57 @@
33

44
# pylint: disable=odoo-addons-relative-import
55
# we are testing, we want to test as we were an external consumer of the API
6+
import os
7+
8+
from odoo.tests import BaseCase, tagged
9+
610
from odoo.addons.queue_job.jobrunner import runner
711

812
from .common import load_doctests
913

1014
load_tests = load_doctests(runner)
15+
16+
17+
@tagged("-at_install", "post_install")
18+
class TestRunner(BaseCase):
19+
@classmethod
20+
def _is_open_file_descriptor(cls, fd):
21+
try:
22+
os.fstat(fd)
23+
return True
24+
except OSError:
25+
return False
26+
27+
def test_runner_file_descriptor(self):
28+
a_runner = runner.QueueJobRunner.from_environ_or_config()
29+
30+
read_fd, write_fd = a_runner._stop_pipe
31+
self.assertTrue(self._is_open_file_descriptor(read_fd))
32+
self.assertTrue(self._is_open_file_descriptor(write_fd))
33+
34+
del a_runner
35+
36+
self.assertFalse(self._is_open_file_descriptor(read_fd))
37+
self.assertFalse(self._is_open_file_descriptor(write_fd))
38+
39+
def test_runner_file_closed_read_descriptor(self):
40+
a_runner = runner.QueueJobRunner.from_environ_or_config()
41+
42+
read_fd, write_fd = a_runner._stop_pipe
43+
os.close(read_fd)
44+
45+
del a_runner
46+
47+
self.assertFalse(self._is_open_file_descriptor(read_fd))
48+
self.assertFalse(self._is_open_file_descriptor(write_fd))
49+
50+
def test_runner_file_closed_write_descriptor(self):
51+
a_runner = runner.QueueJobRunner.from_environ_or_config()
52+
53+
read_fd, write_fd = a_runner._stop_pipe
54+
os.close(write_fd)
55+
56+
del a_runner
57+
58+
self.assertFalse(self._is_open_file_descriptor(read_fd))
59+
self.assertFalse(self._is_open_file_descriptor(write_fd))

0 commit comments

Comments
 (0)