Skip to content

Commit 691541c

Browse files
committed
[fix] Prevent update_config task from detecting itself as duplicate openwisp#1277
The _is_update_in_progress() helper was detecting the currently executing task as a duplicate because Celery's inspect().active() snapshot included the running task itself. Replaced snapshot-based detection with an atomic per-device cache lock using Django's cache framework.
1 parent f3c99c4 commit 691541c

File tree

3 files changed

+97
-33
lines changed

3 files changed

+97
-33
lines changed

openwisp_controller/config/tests/test_config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -984,7 +984,8 @@ def test_certificate_renew_invalidates_checksum_cache(self):
984984
vpnclient_cert.renew()
985985
# An additional call from cache invalidation of
986986
# DeviceGroupCommonName View
987-
self.assertEqual(mocked_delete.call_count, 3)
987+
# +1 call from _release_update_config_lock releasing the per-device lock
988+
self.assertEqual(mocked_delete.call_count, 4)
988989
del config.backend_instance
989990
self.assertNotEqual(config.get_cached_checksum(), old_checksum)
990991
config.refresh_from_db()

openwisp_controller/connection/tasks.py

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import logging
22
import time
3+
import uuid
34

45
import swapper
5-
from celery import current_app, shared_task
6+
from celery import shared_task
67
from celery.exceptions import SoftTimeLimitExceeded
8+
from django.core.cache import cache
79
from django.core.exceptions import ObjectDoesNotExist
810
from django.utils.translation import gettext_lazy as _
911
from swapper import load_model
@@ -13,19 +15,36 @@
1315
from .exceptions import NoWorkingDeviceConnectionError
1416

1517
logger = logging.getLogger(__name__)
16-
_TASK_NAME = "openwisp_controller.connection.tasks.update_config"
18+
_UPDATE_CONFIG_LOCK_KEY = "ow_update_config_{device_id}"
19+
# Lock timeout (in seconds) acts as a safety net to release the lock
20+
# in case the task crashes without proper cleanup.
21+
_UPDATE_CONFIG_LOCK_TIMEOUT = 300
1722

1823

19-
def _is_update_in_progress(device_id):
20-
active = current_app.control.inspect().active()
21-
if not active:
22-
return False
23-
# check if there's any other running task before adding it
24-
for task_list in active.values():
25-
for task in task_list:
26-
if task["name"] == _TASK_NAME and str(device_id) in task["args"]:
27-
return True
28-
return False
24+
def _acquire_update_config_lock(device_id):
25+
"""
26+
Attempts to atomically acquire a per-device lock using the Django cache.
27+
Returns a unique token string if the lock was acquired, None otherwise.
28+
The token must be passed to _release_update_config_lock to ensure
29+
only the lock owner can release it.
30+
"""
31+
lock_key = _UPDATE_CONFIG_LOCK_KEY.format(device_id=device_id)
32+
token = str(uuid.uuid4())
33+
# cache.add is atomic: returns True only if the key doesn't already exist
34+
if cache.add(lock_key, token, timeout=_UPDATE_CONFIG_LOCK_TIMEOUT):
35+
return token
36+
return None
37+
38+
39+
def _release_update_config_lock(device_id, token):
40+
"""
41+
Releases the per-device update_config lock only if the caller
42+
owns it (i.e. the stored token matches).
43+
"""
44+
lock_key = _UPDATE_CONFIG_LOCK_KEY.format(device_id=device_id)
45+
stored_token = cache.get(lock_key)
46+
if stored_token == token:
47+
cache.delete(lock_key)
2948

3049

3150
@shared_task
@@ -48,15 +67,26 @@ def update_config(device_id):
4867
except ObjectDoesNotExist as e:
4968
logger.warning(f'update_config("{device_id}") failed: {e}')
5069
return
51-
if _is_update_in_progress(device_id):
70+
lock_token = _acquire_update_config_lock(device_id)
71+
if not lock_token:
72+
logger.info(
73+
f"update_config for device {device_id} is already in progress, skipping"
74+
)
5275
return
5376
try:
54-
device_conn = DeviceConnection.get_working_connection(device)
55-
except NoWorkingDeviceConnectionError:
56-
return
57-
else:
58-
logger.info(f"Updating {device} (pk: {device_id})")
59-
device_conn.update_config()
77+
try:
78+
device_conn = DeviceConnection.get_working_connection(device)
79+
except NoWorkingDeviceConnectionError as e:
80+
logger.warning(
81+
f"update_config for device {device_id}: "
82+
f"DeviceConnection.get_working_connection failed: {e}"
83+
)
84+
return
85+
else:
86+
logger.info(f"Updating {device} (pk: {device_id})")
87+
device_conn.update_config()
88+
finally:
89+
_release_update_config_lock(device_id, lock_token)
6090

6191

6292
# task timeout is SSH_COMMAND_TIMEOUT plus a 20% margin

openwisp_controller/connection/tests/test_models.py

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121
)
2222
from ..exceptions import NoWorkingDeviceConnectionError
2323
from ..signals import is_working_changed
24-
from ..tasks import _TASK_NAME, update_config
24+
from ..tasks import (
25+
_acquire_update_config_lock,
26+
_release_update_config_lock,
27+
update_config,
28+
)
2529
from .utils import CreateConnectionsMixin
2630

2731
Config = load_model("config", "Config")
@@ -1026,20 +1030,19 @@ def _assert_applying_conf_test_command(mocked_exec):
10261030
@mock.patch.object(DeviceConnection, "update_config")
10271031
@mock.patch.object(DeviceConnection, "get_working_connection")
10281032
def test_device_update_config_in_progress(
1029-
self, mocked_get_working_connection, update_config, mocked_sleep
1033+
self, mocked_get_working_connection, mocked_update_config, mocked_sleep
10301034
):
10311035
conf = self._prepare_conf_object()
10321036

1033-
with mock.patch("celery.app.control.Inspect.active") as mocked_active:
1034-
mocked_active.return_value = {
1035-
"task": [{"name": _TASK_NAME, "args": [str(conf.device.pk)]}]
1036-
}
1037+
with mock.patch(
1038+
"openwisp_controller.connection.tasks._acquire_update_config_lock",
1039+
return_value=None,
1040+
):
10371041
conf.config = {"general": {"timezone": "UTC"}}
10381042
conf.full_clean()
10391043
conf.save()
1040-
mocked_active.assert_called_once()
10411044
mocked_get_working_connection.assert_not_called()
1042-
update_config.assert_not_called()
1045+
mocked_update_config.assert_not_called()
10431046

10441047
@mock.patch("time.sleep")
10451048
@mock.patch.object(DeviceConnection, "update_config")
@@ -1052,16 +1055,46 @@ def test_device_update_config_not_in_progress(
10521055
conf.device.deviceconnection_set.first()
10531056
)
10541057

1055-
with mock.patch("celery.app.control.Inspect.active") as mocked_active:
1056-
mocked_active.return_value = {
1057-
"task": [{"name": _TASK_NAME, "args": ["..."]}]
1058-
}
1058+
with mock.patch(
1059+
"openwisp_controller.connection.tasks._acquire_update_config_lock",
1060+
return_value="fake-lock-token",
1061+
), mock.patch(
1062+
"openwisp_controller.connection.tasks._release_update_config_lock",
1063+
) as mocked_release:
10591064
conf.config = {"general": {"timezone": "UTC"}}
10601065
conf.full_clean()
10611066
conf.save()
1062-
mocked_active.assert_called_once()
10631067
mocked_get_working_connection.assert_called_once()
10641068
mocked_update_config.assert_called_once()
1069+
mocked_release.assert_called_once_with(
1070+
str(conf.device.pk), "fake-lock-token"
1071+
)
1072+
1073+
def test_acquire_update_config_lock(self):
1074+
"""Test that the lock can be acquired and prevents duplicate acquisition."""
1075+
device_id = "test-device-id"
1076+
# First acquisition should succeed and return a token
1077+
token = _acquire_update_config_lock(device_id)
1078+
self.addCleanup(_release_update_config_lock, device_id, token)
1079+
self.assertIsNotNone(token)
1080+
# Second acquisition should fail (lock already held)
1081+
self.assertIsNone(_acquire_update_config_lock(device_id))
1082+
# After releasing with correct token, acquisition should succeed again
1083+
_release_update_config_lock(device_id, token)
1084+
token2 = _acquire_update_config_lock(device_id)
1085+
self.addCleanup(_release_update_config_lock, device_id, token2)
1086+
self.assertIsNotNone(token2)
1087+
1088+
def test_release_update_config_lock_wrong_token(self):
1089+
"""Only the lock owner can release the lock."""
1090+
device_id = "test-device-id"
1091+
token = _acquire_update_config_lock(device_id)
1092+
self.addCleanup(_release_update_config_lock, device_id, token)
1093+
self.assertIsNotNone(token)
1094+
# Releasing with wrong token should not delete the lock
1095+
_release_update_config_lock(device_id, "wrong-token")
1096+
# Lock should still be held
1097+
self.assertIsNone(_acquire_update_config_lock(device_id))
10651098

10661099
@mock.patch(_connect_path)
10671100
def test_schedule_command_called(self, connect_mocked):

0 commit comments

Comments
 (0)