Skip to content

Commit 09208c6

Browse files
committed
Replace snapshot-based detection with atomic cache lock
1 parent f3832c5 commit 09208c6

File tree

2 files changed

+58
-77
lines changed

2 files changed

+58
-77
lines changed

openwisp_controller/connection/tasks.py

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
import time
33

44
import swapper
5-
from celery import current_app, shared_task
5+
from celery import shared_task
66
from celery.exceptions import SoftTimeLimitExceeded
7+
from django.core.cache import cache
78
from django.core.exceptions import ObjectDoesNotExist
89
from django.utils.translation import gettext_lazy as _
910
from swapper import load_model
@@ -13,23 +14,26 @@
1314
from .exceptions import NoWorkingDeviceConnectionError
1415

1516
logger = logging.getLogger(__name__)
16-
_TASK_NAME = "openwisp_controller.connection.tasks.update_config"
17+
_UPDATE_CONFIG_LOCK_KEY = "ow_update_config_{device_id}"
18+
# Lock timeout (in seconds) acts as a safety net to release the lock
19+
# in case the task crashes without proper cleanup.
20+
_UPDATE_CONFIG_LOCK_TIMEOUT = 300
1721

1822

19-
def _is_update_in_progress(device_id, current_task_id=None):
20-
active = current_app.control.inspect().active()
21-
if not active:
22-
return False
23-
# check if there's any other running task before adding it
24-
for task_list in active.values():
25-
for task in task_list:
26-
if (
27-
task["name"] == _TASK_NAME
28-
and str(device_id) in task["args"]
29-
and task["id"] != current_task_id
30-
):
31-
return True
32-
return False
23+
def _acquire_update_config_lock(device_id):
24+
"""
25+
Attempts to atomically acquire a per-device lock using the Django cache.
26+
Returns True if the lock was acquired, False if another task already holds it.
27+
"""
28+
lock_key = _UPDATE_CONFIG_LOCK_KEY.format(device_id=device_id)
29+
# cache.add is atomic: returns True only if the key doesn't already exist
30+
return cache.add(lock_key, True, timeout=_UPDATE_CONFIG_LOCK_TIMEOUT)
31+
32+
33+
def _release_update_config_lock(device_id):
34+
"""Releases the per-device update_config lock."""
35+
lock_key = _UPDATE_CONFIG_LOCK_KEY.format(device_id=device_id)
36+
cache.delete(lock_key)
3337

3438

3539
@shared_task(bind=True)
@@ -52,18 +56,21 @@ def update_config(self, device_id):
5256
except ObjectDoesNotExist as e:
5357
logger.warning(f'update_config("{device_id}") failed: {e}')
5458
return
55-
if _is_update_in_progress(device_id, current_task_id=self.request.id):
59+
if not _acquire_update_config_lock(device_id):
5660
logger.info(
5761
f"update_config for device {device_id} is already in progress, skipping"
5862
)
5963
return
6064
try:
61-
device_conn = DeviceConnection.get_working_connection(device)
62-
except NoWorkingDeviceConnectionError:
63-
return
64-
else:
65-
logger.info(f"Updating {device} (pk: {device_id})")
66-
device_conn.update_config()
65+
try:
66+
device_conn = DeviceConnection.get_working_connection(device)
67+
except NoWorkingDeviceConnectionError:
68+
return
69+
else:
70+
logger.info(f"Updating {device} (pk: {device_id})")
71+
device_conn.update_config()
72+
finally:
73+
_release_update_config_lock(device_id)
6774

6875

6976
# task timeout is SSH_COMMAND_TIMEOUT plus a 20% margin

openwisp_controller/connection/tests/test_models.py

Lines changed: 28 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121
)
2222
from ..exceptions import NoWorkingDeviceConnectionError
2323
from ..signals import is_working_changed
24-
from ..tasks import _TASK_NAME, _is_update_in_progress, update_config
24+
from ..tasks import (
25+
_acquire_update_config_lock,
26+
_release_update_config_lock,
27+
update_config,
28+
)
2529
from .utils import CreateConnectionsMixin
2630

2731
Config = load_model("config", "Config")
@@ -1026,26 +1030,19 @@ def _assert_applying_conf_test_command(mocked_exec):
10261030
@mock.patch.object(DeviceConnection, "update_config")
10271031
@mock.patch.object(DeviceConnection, "get_working_connection")
10281032
def test_device_update_config_in_progress(
1029-
self, mocked_get_working_connection, update_config, mocked_sleep
1033+
self, mocked_get_working_connection, mocked_update_config, mocked_sleep
10301034
):
10311035
conf = self._prepare_conf_object()
10321036

1033-
with mock.patch("celery.app.control.Inspect.active") as mocked_active:
1034-
mocked_active.return_value = {
1035-
"task": [
1036-
{
1037-
"name": _TASK_NAME,
1038-
"args": [str(conf.device.pk)],
1039-
"id": "other-task-id",
1040-
}
1041-
]
1042-
}
1037+
with mock.patch(
1038+
"openwisp_controller.connection.tasks._acquire_update_config_lock",
1039+
return_value=False,
1040+
):
10431041
conf.config = {"general": {"timezone": "UTC"}}
10441042
conf.full_clean()
10451043
conf.save()
1046-
mocked_active.assert_called_once()
10471044
mocked_get_working_connection.assert_not_called()
1048-
update_config.assert_not_called()
1045+
mocked_update_config.assert_not_called()
10491046

10501047
@mock.patch("time.sleep")
10511048
@mock.patch.object(DeviceConnection, "update_config")
@@ -1058,53 +1055,30 @@ def test_device_update_config_not_in_progress(
10581055
conf.device.deviceconnection_set.first()
10591056
)
10601057

1061-
with mock.patch("celery.app.control.Inspect.active") as mocked_active:
1062-
mocked_active.return_value = {
1063-
"task": [{"name": _TASK_NAME, "args": ["..."], "id": "other-task-id"}]
1064-
}
1058+
with mock.patch(
1059+
"openwisp_controller.connection.tasks._acquire_update_config_lock",
1060+
return_value=True,
1061+
), mock.patch(
1062+
"openwisp_controller.connection.tasks._release_update_config_lock",
1063+
):
10651064
conf.config = {"general": {"timezone": "UTC"}}
10661065
conf.full_clean()
10671066
conf.save()
1068-
mocked_active.assert_called_once()
10691067
mocked_get_working_connection.assert_called_once()
10701068
mocked_update_config.assert_called_once()
10711069

1072-
def test_is_update_in_progress_ignores_current_task(self):
1073-
"""Regression test: _is_update_in_progress must not count
1074-
the calling task itself as a duplicate."""
1070+
def test_acquire_update_config_lock(self):
1071+
"""Test that the lock can be acquired and prevents duplicate acquisition."""
10751072
device_id = "test-device-id"
1076-
current_task_id = "current-task-id"
1077-
with mock.patch("celery.app.control.Inspect.active") as mocked_active:
1078-
mocked_active.return_value = {
1079-
"worker": [
1080-
{
1081-
"name": _TASK_NAME,
1082-
"args": [device_id],
1083-
"id": current_task_id,
1084-
}
1085-
]
1086-
}
1087-
result = _is_update_in_progress(device_id, current_task_id=current_task_id)
1088-
self.assertFalse(result)
1089-
1090-
def test_is_update_in_progress_detects_other_task(self):
1091-
"""_is_update_in_progress returns True when another task
1092-
for the same device is active."""
1093-
device_id = "test-device-id"
1094-
with mock.patch("celery.app.control.Inspect.active") as mocked_active:
1095-
mocked_active.return_value = {
1096-
"worker": [
1097-
{
1098-
"name": _TASK_NAME,
1099-
"args": [device_id],
1100-
"id": "other-task-id",
1101-
}
1102-
]
1103-
}
1104-
result = _is_update_in_progress(
1105-
device_id, current_task_id="current-task-id"
1106-
)
1107-
self.assertTrue(result)
1073+
# First acquisition should succeed
1074+
self.assertTrue(_acquire_update_config_lock(device_id))
1075+
# Second acquisition should fail (lock already held)
1076+
self.assertFalse(_acquire_update_config_lock(device_id))
1077+
# After releasing, acquisition should succeed again
1078+
_release_update_config_lock(device_id)
1079+
self.assertTrue(_acquire_update_config_lock(device_id))
1080+
# Cleanup
1081+
_release_update_config_lock(device_id)
11081082

11091083
@mock.patch(_connect_path)
11101084
def test_schedule_command_called(self, connect_mocked):

0 commit comments

Comments
 (0)