Skip to content

Commit 8861220

Browse files
committed
moved exclusive periodic
1 parent 751d69e commit 8861220

File tree

1 file changed

+46
-3
lines changed

1 file changed

+46
-3
lines changed

packages/service-library/src/servicelib/background_task.py

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@
33
import datetime
44
import functools
55
import logging
6+
import socket
67
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
78
from typing import Any, Final, ParamSpec, TypeVar
89

10+
import arrow
911
from tenacity import TryAgain, before_sleep_log, retry, retry_if_exception_type
1012
from tenacity.wait import wait_fixed
1113

1214
from .async_utils import cancel_wait_task, with_delay
1315
from .logging_utils import log_context
16+
from .redis import RedisClientSDK, exclusive
1417

1518
_logger = logging.getLogger(__name__)
1619

@@ -70,9 +73,11 @@ def _decorator(
7073
sleep=nap,
7174
wait=wait_fixed(interval.total_seconds()),
7275
reraise=True,
73-
retry=retry_if_exception_type(TryAgain)
74-
if raise_on_error
75-
else retry_if_exception_type(),
76+
retry=(
77+
retry_if_exception_type(TryAgain)
78+
if raise_on_error
79+
else retry_if_exception_type()
80+
),
7681
before_sleep=before_sleep_log(_logger, logging.DEBUG),
7782
)
7883
@functools.wraps(func)
@@ -135,3 +140,41 @@ async def periodic_task(
135140
# NOTE: this stopping is shielded to prevent the cancellation to propagate
136141
# into the stopping procedure
137142
await asyncio.shield(cancel_wait_task(asyncio_task, max_delay=stop_timeout))
143+
144+
145+
def exclusive_periodic(
146+
client: RedisClientSDK,
147+
*,
148+
task_interval: datetime.timedelta,
149+
retry_after: datetime.timedelta,
150+
) -> Callable[
151+
[Callable[P, Coroutine[Any, Any, None]]], Callable[P, Coroutine[Any, Any, None]]
152+
]:
153+
"""decorates a function to become exclusive and periodic.
154+
155+
Arguments:
156+
client -- The Redis client
157+
task_interval -- the task periodicity
158+
retry_after -- in case the exclusive lock cannot be acquired or is lost, this is the retry interval
159+
160+
Returns:
161+
Nothing, a periodic method does not return anything as it runs forever.
162+
"""
163+
164+
def _decorator(
165+
func: Callable[P, Coroutine[Any, Any, None]],
166+
) -> Callable[P, Coroutine[Any, Any, None]]:
167+
@periodic(interval=retry_after)
168+
@exclusive(
169+
client,
170+
lock_key=f"lock:exclusive_periodic_task:{func.__name__}",
171+
lock_value=f"locked since {arrow.utcnow().format()} by {client.client_name} on {socket.gethostname()}",
172+
)
173+
@periodic(interval=task_interval)
174+
@functools.wraps(func)
175+
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
176+
return await func(*args, **kwargs)
177+
178+
return _wrapper
179+
180+
return _decorator

0 commit comments

Comments
 (0)