Skip to content

Commit cfb3dc0

Browse files
committed
re-write exclusive task starter
1 parent 445918c commit cfb3dc0

File tree

1 file changed

+13
-40
lines changed

1 file changed

+13
-40
lines changed

packages/service-library/src/servicelib/redis/_distributed_locks_utils.py

Lines changed: 13 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,43 +5,13 @@
55

66
import arrow
77

8-
from ..background_task import start_periodic_task
8+
from ..background_task import periodic
99
from ._client import RedisClientSDK
1010
from ._decorators import exclusive
11-
from ._errors import CouldNotAcquireLockError
1211

1312
_logger = logging.getLogger(__name__)
1413

1514

16-
async def _exclusive_task_starter(
17-
client: RedisClientSDK,
18-
usr_tsk_task: Callable[..., Awaitable[None]],
19-
*,
20-
usr_tsk_interval: datetime.timedelta,
21-
usr_tsk_task_name: str,
22-
**kwargs,
23-
) -> None:
24-
lock_key = f"lock:exclusive_task_starter:{usr_tsk_task_name}"
25-
lock_value = f"locked since {arrow.utcnow().format()}"
26-
27-
try:
28-
await exclusive(client, lock_key=lock_key, lock_value=lock_value)(
29-
start_periodic_task
30-
)(
31-
usr_tsk_task,
32-
interval=usr_tsk_interval,
33-
task_name=usr_tsk_task_name,
34-
**kwargs,
35-
)
36-
except CouldNotAcquireLockError:
37-
_logger.debug(
38-
"Could not acquire lock '%s' with value '%s'", lock_key, lock_value
39-
)
40-
except Exception as e:
41-
_logger.exception(e) # noqa: TRY401
42-
raise
43-
44-
4515
def start_exclusive_periodic_task(
4616
client: RedisClientSDK,
4717
task: Callable[..., Awaitable[None]],
@@ -67,13 +37,16 @@ def start_exclusive_periodic_task(
6737
If Redis connectivity is lost, the periodic `_exclusive_task_starter` ensures the lock is
6838
reacquired
6939
"""
70-
return start_periodic_task(
71-
_exclusive_task_starter,
72-
interval=retry_after,
73-
task_name=f"exclusive_task_starter_{task_name}",
74-
client=client,
75-
usr_tsk_task=task,
76-
usr_tsk_interval=task_period,
77-
usr_tsk_task_name=task_name,
78-
**kwargs,
40+
41+
@periodic(interval=retry_after)
42+
@exclusive(
43+
client,
44+
lock_key=f"lock:exclusive_task_starter:{task_name}",
45+
lock_value=f"locked since {arrow.utcnow().format()}",
7946
)
47+
@periodic(interval=task_period)
48+
async def _() -> None:
49+
await task(**kwargs)
50+
51+
assert asyncio.iscoroutinefunction(_) # nosec
52+
return asyncio.create_task(_())

0 commit comments

Comments
 (0)