-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_requeue_dead_job.py
More file actions
100 lines (78 loc) · 3.05 KB
/
test_requeue_dead_job.py
File metadata and controls
100 lines (78 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# Copyright 2025 ACSONE SA/NV
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
from contextlib import closing
from datetime import datetime, timedelta
from odoo.tests import tagged
from odoo.addons.queue_job.job import Job
from odoo.addons.queue_job.jobrunner.runner import Database
from .common import JobCommonCase
@tagged("post_install", "-at_install")
class TestRequeueDeadJob(JobCommonCase):
def get_locks(self, uuid, cr=None):
"""
Retrieve lock rows
"""
if cr is None:
cr = self.env.cr
cr.execute(
"""
SELECT
queue_job_id
FROM
queue_job_lock
WHERE
queue_job_id IN (
SELECT
id
FROM
queue_job
WHERE
uuid = %s
)
FOR NO KEY UPDATE SKIP LOCKED
""",
[uuid],
)
return cr.fetchall()
def test_add_lock_record(self):
queue_job = self._get_demo_job("test_started_job")
self.assertEqual(len(queue_job), 1)
job_obj = Job.load(self.env, queue_job.uuid)
job_obj.set_started()
self.assertEqual(job_obj.state, "started")
locks = self.get_locks(job_obj.uuid)
self.assertEqual(1, len(locks))
def test_lock(self):
queue_job = self._get_demo_job("test_started_job")
job_obj = Job.load(self.env, queue_job.uuid)
job_obj.set_started()
job_obj.lock()
with closing(self.env.registry.cursor()) as new_cr:
locks = self.get_locks(job_obj.uuid, new_cr)
# Row should be locked
self.assertEqual(0, len(locks))
def test_requeue_dead_jobs(self):
queue_job = self._get_demo_job("test_enqueued_job")
job_obj = Job.load(self.env, queue_job.uuid)
job_obj.set_enqueued()
job_obj.set_started()
job_obj.date_enqueued = datetime.now() - timedelta(minutes=1)
job_obj.store()
# requeue dead jobs using current cursor
query = Database(self.env.cr.dbname)._query_requeue_dead_jobs()
self.env.cr.execute(query)
uuids_requeued = self.env.cr.fetchall()
self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued)
def test_requeue_orphaned_jobs(self):
queue_job = self._get_demo_job("test_enqueued_job")
job_obj = Job.load(self.env, queue_job.uuid)
# Only enqueued job, don't set it to started to simulate the scenario
# that system shutdown before job is starting
job_obj.set_enqueued()
job_obj.date_enqueued = datetime.now() - timedelta(minutes=1)
job_obj.store()
# job is now picked up by the requeue query (which includes orphaned jobs)
query = Database(self.env.cr.dbname)._query_requeue_dead_jobs()
self.env.cr.execute(query)
uuids_requeued = self.env.cr.fetchall()
self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued)