Skip to content

Commit 0b59b6f

Browse files
committed
cleanup
1 parent 4562ed4 commit 0b59b6f

File tree

2 files changed

+64
-42
lines changed

2 files changed

+64
-42
lines changed

packages/service-library/src/servicelib/background_task.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import Final
77

88
from common_library.errors_classes import OsparcErrorMixin
9-
from tenacity import TryAgain
9+
from tenacity import TryAgain, retry_always, retry_if_exception_type
1010
from tenacity.asyncio import AsyncRetrying
1111
from tenacity.stop import stop_after_attempt
1212
from tenacity.wait import wait_fixed
@@ -26,14 +26,14 @@ class PeriodicTaskCancellationError(OsparcErrorMixin, Exception):
2626

2727

2828
class SleepUsingAsyncioEvent:
29-
"""Sleep strategy that waits on an event to be set."""
29+
"""Sleep strategy that waits on an event to be set or sleeps."""
3030

3131
def __init__(self, event: "asyncio.Event") -> None:
3232
self.event = event
3333

34-
async def __call__(self, timeout: float | None) -> None:
34+
async def __call__(self, delay: float | None) -> None:
3535
with contextlib.suppress(TimeoutError):
36-
await asyncio.wait_for(self.event.wait(), timeout=timeout)
36+
await asyncio.wait_for(self.event.wait(), timeout=delay)
3737
self.event.clear()
3838

3939

@@ -42,10 +42,18 @@ async def _periodic_scheduled_task(
4242
*,
4343
interval: datetime.timedelta,
4444
task_name: str,
45+
raise_on_error: bool,
4546
early_wake_up_event: asyncio.Event | None,
4647
**task_kwargs,
4748
) -> None:
48-
# NOTE: This retries forever unless cancelled
49+
"""periodically runs task with a given interval.
50+
If raise_on_error is False, the task will be retried indefinitely unless cancelled.
51+
If raise_on_error is True, the task will be retried indefinitely unless cancelled or an exception is raised.
52+
If early_wake_up_event is set, the task might be woken up earlier than interval when the event is set.
53+
54+
Raises:
55+
task exception if raise_on_error is True
56+
"""
4957
nap = (
5058
asyncio.sleep
5159
if early_wake_up_event is None
@@ -54,6 +62,8 @@ async def _periodic_scheduled_task(
5462
async for attempt in AsyncRetrying(
5563
sleep=nap,
5664
wait=wait_fixed(interval.total_seconds()),
65+
reraise=True,
66+
retry=retry_if_exception_type(TryAgain) if raise_on_error else retry_always,
5767
):
5868
with attempt:
5969
with (
@@ -74,6 +84,7 @@ def start_periodic_task(
7484
*,
7585
interval: datetime.timedelta,
7686
task_name: str,
87+
raise_on_error: bool = False,
7788
wait_before_running: datetime.timedelta = datetime.timedelta(0),
7889
early_wake_up_event: asyncio.Event | None = None,
7990
**kwargs,
@@ -89,6 +100,7 @@ def start_periodic_task(
89100
task,
90101
interval=interval,
91102
task_name=task_name,
103+
raise_on_error=raise_on_error,
92104
early_wake_up_event=early_wake_up_event,
93105
**kwargs,
94106
),
@@ -149,7 +161,11 @@ async def periodic_task(
149161
asyncio_task: asyncio.Task | None = None
150162
try:
151163
asyncio_task = start_periodic_task(
152-
task, interval=interval, task_name=task_name, **kwargs
164+
task,
165+
interval=interval,
166+
task_name=task_name,
167+
raise_on_error=raise_on_error,
168+
**kwargs,
153169
)
154170
yield asyncio_task
155171
finally:

packages/service-library/src/servicelib/redis/_decorators.py

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22
import contextlib
33
import functools
44
import logging
5-
from collections.abc import Awaitable, Callable
6-
from typing import ParamSpec, TypeVar
5+
from collections.abc import Callable
6+
from typing import Any, Coroutine, ParamSpec, TypeVar
77

88
import redis.exceptions
9+
from servicelib.logging_utils import log_context
910

1011
from ..background_task import periodic_task
1112
from ._client import RedisClientSDK
12-
from ._constants import DEFAULT_LOCK_TTL
13+
from ._constants import DEFAULT_LOCK_TTL, SHUTDOWN_TIMEOUT_S
1314
from ._errors import CouldNotAcquireLockError
1415
from ._utils import auto_extend_lock
1516

@@ -24,26 +25,30 @@ def exclusive(
2425
*,
2526
lock_key: str | Callable[..., str],
2627
lock_value: bytes | str | None = None,
27-
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
28+
) -> Callable[
29+
[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]
30+
]:
2831
"""
29-
Define a method to run exclusively across
30-
processes by leveraging a Redis Lock.
31-
32-
parameters:
33-
redis: the redis client SDK
34-
lock_key: a string as the name of the lock (good practice: app_name:lock_name)
35-
lock_value: some additional data that can be retrieved by another client
36-
37-
Raises:
38-
- ValueError if used incorrectly
39-
- CouldNotAcquireLockError if the lock could not be acquired
32+
Define a method to run exclusively across
33+
processes by leveraging a Redis Lock.
34+
a1f69fdefa14fae2fee03fac7e89f27e44b13aa9
35+
parameters:
36+
redis: the redis client SDK
37+
lock_key: a string as the name of the lock (good practice: app_name:lock_name)
38+
lock_value: some additional data that can be retrieved by another client
39+
40+
Raises:
41+
- ValueError if used incorrectly
42+
- CouldNotAcquireLockError if the lock could not be acquired
4043
"""
4144

4245
if not lock_key:
4346
msg = "lock_key cannot be empty string!"
4447
raise ValueError(msg)
4548

46-
def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
49+
def decorator(
50+
func: Callable[P, Coroutine[Any, Any, R]],
51+
) -> Callable[P, Coroutine[Any, Any, R]]:
4752
@functools.wraps(func)
4853
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
4954
redis_lock_key = (
@@ -58,51 +63,52 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
5863
)
5964
assert isinstance(redis_client, RedisClientSDK) # nosec
6065

61-
lock_ttl = DEFAULT_LOCK_TTL
62-
63-
lock = client.create_lock(redis_lock_key, ttl=lock_ttl)
66+
lock = client.create_lock(redis_lock_key, ttl=DEFAULT_LOCK_TTL)
6467
if not await lock.acquire(token=lock_value):
6568
raise CouldNotAcquireLockError(lock=lock)
6669

6770
try:
6871
async with periodic_task(
6972
auto_extend_lock,
70-
interval=lock_ttl / 2,
73+
interval=DEFAULT_LOCK_TTL / 2,
7174
task_name=f"autoextend_exclusive_lock_{redis_lock_key}",
7275
raise_on_error=True,
7376
lock=lock,
7477
) as auto_extend_task:
7578
work_task = asyncio.create_task(
7679
func(*args, **kwargs), name=f"exclusive_{func.__name__}"
7780
)
78-
done, pending = await asyncio.wait(
81+
done, _pending = await asyncio.wait(
7982
[work_task, auto_extend_task],
8083
return_when=asyncio.FIRST_COMPLETED,
8184
)
82-
# the task finished first, let's return it
85+
# the task finished, let's return its result whatever it is
8386
if work_task in done:
8487
return await work_task
8588

86-
# the auto extend tasks finished first, meaning it could not extend the lock!
87-
# let's cancel the work task and raise an error
88-
work_task.cancel()
89-
with contextlib.suppress(asyncio.CancelledError, TimeoutError):
90-
# TODO: shall we raise the other errors?
91-
await asyncio.wait_for(work_task, timeout=3)
92-
89+
# the auto extend task can only finish if it raised an error, so it's bad
90+
_logger.error(
91+
"lock %s could not be auto-extended, cancelling work task! "
92+
"TIP: check connection to Redis DBs or look for Synchronous "
93+
"code that might block the auto-extender task.",
94+
lock.name,
95+
)
96+
with log_context(_logger, logging.DEBUG, msg="cancel work task"):
97+
work_task.cancel()
98+
with contextlib.suppress(asyncio.CancelledError, TimeoutError):
99+
# this will raise any other error that could have happened in the work task
100+
await asyncio.wait_for(
101+
work_task, timeout=SHUTDOWN_TIMEOUT_S
102+
)
103+
# return the extend task raised error
93104
return await auto_extend_task
94105

95-
return result
96106
finally:
97107
with contextlib.suppress(redis.exceptions.LockNotOwnedError):
98-
# in the case where the lock would have been lost, this would raise
108+
# in the case where the lock would have been lost,
109+
# this would raise again and is not necessary
99110
await lock.release()
100111

101-
async with redis_client.lock_context(
102-
lock_key=redis_lock_key, lock_value=lock_value
103-
):
104-
return await func(*args, **kwargs)
105-
106112
return wrapper
107113

108114
return decorator

0 commit comments

Comments
 (0)