Skip to content

Commit 15a8dae

Browse files
fix: run advisory_lock import/sync project in the right place AAP-47111 (#1333)
Fix a bug introduced in #1296 where we are running under the advisory lock not the real import/sync task but the proxy that schedules the job for rq and dispatcherd. This fix run the task under the lock in the right place. Also allows to merge both test_job_uniqueness and test_project_job_uniqueness tests Jira: AAP-47111 Signed-off-by: Alex <[email protected]>
1 parent 421a711 commit 15a8dae

File tree

2 files changed

+50
-100
lines changed

2 files changed

+50
-100
lines changed

src/aap_eda/tasks/project.py

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,27 +28,18 @@
2828
logger = logging.getLogger(__name__)
2929
PROJECT_TASKS_QUEUE = "default"
3030

31-
# Wrap the django_rq job decorator so its processing is within our retry
32-
# code.
33-
3431

3532
def import_project(project_id: int) -> tp.Optional[str]:
3633
"""Import project async task.
3734
3835
Proxy for import_project_dispatcherd and import_project_rq.
3936
"""
40-
with advisory_lock(f"import_project_{project_id}", wait=False) as acquired:
41-
if not acquired:
42-
logger.debug(
43-
f"Another task already importing project {project_id}, exiting"
44-
)
45-
return None
46-
if features.DISPATCHERD:
47-
return import_project_dispatcherd(project_id)
37+
if features.DISPATCHERD:
38+
return import_project_dispatcherd(project_id)
4839

49-
# rq
50-
job_data = import_project_rq(project_id)
51-
return job_data
40+
# rq
41+
job_data = import_project_rq(project_id)
42+
return job_data
5243

5344

5445
def import_project_dispatcherd(project_id: int) -> str:
@@ -68,6 +59,17 @@ def import_project_rq(project_id: int) -> str:
6859

6960

7061
def _import_project(project_id: int):
62+
"""Wrap _import_project_no_lock with advisory lock."""
63+
with advisory_lock(f"import_project_{project_id}", wait=False) as acquired:
64+
if not acquired:
65+
logger.debug(
66+
f"Another task already importing project {project_id}, exiting"
67+
)
68+
return
69+
_import_project_no_lock(project_id)
70+
71+
72+
def _import_project_no_lock(project_id: int):
7173
"""Import project without lock.
7274
7375
This function is intended to be run by the tasking system inside the lock.
@@ -88,17 +90,11 @@ def sync_project(project_id: int) -> tp.Optional[str]:
8890
8991
Proxy for sync_project_dispatcherd and sync_project_rq.
9092
"""
91-
with advisory_lock(f"sync_project_{project_id}", wait=False) as acquired:
92-
if not acquired:
93-
logger.debug(
94-
f"Another task already syncing project {project_id}, exiting"
95-
)
96-
return None
97-
if features.DISPATCHERD:
98-
return sync_project_dispatcherd(project_id)
99-
# rq
100-
job_data = sync_project_rq(project_id)
101-
return job_data
93+
if features.DISPATCHERD:
94+
return sync_project_dispatcherd(project_id)
95+
# rq
96+
job_data = sync_project_rq(project_id)
97+
return job_data
10298

10399

104100
def sync_project_dispatcherd(project_id: int) -> str:
@@ -118,6 +114,21 @@ def sync_project_rq(project_id: int) -> str:
118114

119115

120116
def _sync_project(project_id: int):
117+
"""Wrap _sync_project_no_lock with advisory lock."""
118+
with advisory_lock(f"sync_project_{project_id}", wait=False) as acquired:
119+
if not acquired:
120+
logger.debug(
121+
f"Another task already syncing project {project_id}, exiting"
122+
)
123+
return
124+
_sync_project_no_lock(project_id)
125+
126+
127+
def _sync_project_no_lock(project_id: int):
128+
"""Sync project without lock.
129+
130+
This function is intended to be run by the tasking system inside the lock.
131+
"""
121132
logger.info(f"Task started: Sync project ( {project_id=} )")
122133

123134
project = models.Project.objects.get(pk=project_id)

tests/integration/test_advisory_lock.py

Lines changed: 14 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -29,104 +29,43 @@
2929
"fn_call": "monitor_project_tasks",
3030
"fn_args": [],
3131
},
32-
],
33-
ids=[
34-
"gather_analytics",
35-
"monitor_rulebook_processes",
36-
"monitor_project_tasks", # only scheduled in RQ
37-
],
38-
)
39-
@pytest.mark.django_db
40-
def test_job_uniqueness(module_data):
41-
call_log = []
42-
lock = Lock()
43-
44-
def _wrapper_call(shared_list):
45-
"""Patch fn mock in a thread-safe way inside each thread."""
46-
import importlib
47-
48-
module = importlib.import_module(module_data["module_path"])
49-
50-
importlib.reload(module)
51-
52-
with mock.patch(
53-
f"{module_data['module_path']}.{module_data['fn_mock']}",
54-
) as fn_mock:
55-
56-
def record_call(void=None):
57-
time.sleep(1)
58-
shared_list.append("called")
59-
60-
fn_mock.side_effect = record_call
61-
try:
62-
getattr(module, module_data["fn_call"])(
63-
*module_data["fn_args"],
64-
)
65-
except Exception as e:
66-
pytest.fail(
67-
f"Exception raised in {module_data['fn_call']}: {e}"
68-
)
69-
70-
def thread_safe_call():
71-
_wrapper_call(ThreadSafeList(call_log, lock))
72-
73-
with ThreadPoolExecutor(max_workers=2) as executor:
74-
futures = [
75-
executor.submit(thread_safe_call),
76-
executor.submit(thread_safe_call),
77-
]
78-
wait(futures, timeout=3)
79-
80-
assert len(call_log) == 1
81-
82-
83-
@pytest.mark.parametrize(
84-
"module_data",
85-
[
8632
{
8733
"module_path": "aap_eda.tasks.project",
88-
"fn_mock_dispatcherd": "import_project_dispatcherd",
89-
"fn_mock_rq": "import_project_rq",
90-
"fn_call": "import_project",
34+
"fn_mock": "_import_project_no_lock",
35+
"fn_call": "_import_project",
9136
"fn_args": [1],
9237
},
9338
{
9439
"module_path": "aap_eda.tasks.project",
95-
"fn_mock_dispatcherd": "sync_project_dispatcherd",
96-
"fn_mock_rq": "sync_project_rq",
97-
"fn_call": "sync_project",
40+
"fn_mock": "_sync_project_no_lock",
41+
"fn_call": "_sync_project",
9842
"fn_args": [1],
9943
},
10044
],
10145
ids=[
102-
"import_project",
103-
"sync_project",
46+
"gather_analytics",
47+
"monitor_rulebook_processes",
48+
"monitor_project_tasks", # only scheduled in RQ
49+
"_import_project",
50+
"_sync_project",
10451
],
10552
)
10653
@pytest.mark.django_db
107-
def test_project_job_uniqueness(module_data):
54+
def test_job_uniqueness(module_data):
10855
call_log = []
10956
lock = Lock()
11057

11158
def _wrapper_call(shared_list):
112-
"""Patch fn_mock in a thread-safe way inside each thread."""
59+
"""Patch fn mock in a thread-safe way inside each thread."""
11360
import importlib
11461

115-
from aap_eda.settings import features
116-
11762
module = importlib.import_module(module_data["module_path"])
11863

11964
importlib.reload(module)
12065

121-
mock_fn_target = (
122-
f"{module_data['fn_mock_dispatcherd']}"
123-
if features.DISPATCHERD
124-
else f"{module_data['fn_mock_rq']}"
125-
)
126-
127-
mock_path = f"{module_data['module_path']}.{mock_fn_target}"
128-
129-
with mock.patch(mock_path) as fn_mock:
66+
with mock.patch(
67+
f"{module_data['module_path']}.{module_data['fn_mock']}",
68+
) as fn_mock:
13069

13170
def record_call(void=None):
13271
time.sleep(1)

0 commit comments

Comments
 (0)