Skip to content

Commit 2430054

Browse files
committed
moved exclusive start to its own module
1 parent 8861220 commit 2430054

File tree

7 files changed

+101
-127
lines changed

7 files changed

+101
-127
lines changed

packages/service-library/src/servicelib/background_task.py

Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,14 @@
33
import datetime
44
import functools
55
import logging
6-
import socket
76
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
87
from typing import Any, Final, ParamSpec, TypeVar
98

10-
import arrow
119
from tenacity import TryAgain, before_sleep_log, retry, retry_if_exception_type
1210
from tenacity.wait import wait_fixed
1311

1412
from .async_utils import cancel_wait_task, with_delay
1513
from .logging_utils import log_context
16-
from .redis import RedisClientSDK, exclusive
1714

1815
_logger = logging.getLogger(__name__)
1916

@@ -140,41 +137,3 @@ async def periodic_task(
140137
# NOTE: this stopping is shielded to prevent the cancellation to propagate
141138
# into the stopping procedure
142139
await asyncio.shield(cancel_wait_task(asyncio_task, max_delay=stop_timeout))
143-
144-
145-
def exclusive_periodic(
146-
client: RedisClientSDK,
147-
*,
148-
task_interval: datetime.timedelta,
149-
retry_after: datetime.timedelta,
150-
) -> Callable[
151-
[Callable[P, Coroutine[Any, Any, None]]], Callable[P, Coroutine[Any, Any, None]]
152-
]:
153-
"""decorates a function to become exclusive and periodic.
154-
155-
Arguments:
156-
client -- The Redis client
157-
task_interval -- the task periodicity
158-
retry_after -- in case the exclusive lock cannot be acquired or is lost, this is the retry interval
159-
160-
Returns:
161-
Nothing, a periodic method does not return anything as it runs forever.
162-
"""
163-
164-
def _decorator(
165-
func: Callable[P, Coroutine[Any, Any, None]],
166-
) -> Callable[P, Coroutine[Any, Any, None]]:
167-
@periodic(interval=retry_after)
168-
@exclusive(
169-
client,
170-
lock_key=f"lock:exclusive_periodic_task:{func.__name__}",
171-
lock_value=f"locked since {arrow.utcnow().format()} by {client.client_name} on {socket.gethostname()}",
172-
)
173-
@periodic(interval=task_interval)
174-
@functools.wraps(func)
175-
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
176-
return await func(*args, **kwargs)
177-
178-
return _wrapper
179-
180-
return _decorator
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import datetime
2+
import functools
3+
from collections.abc import Callable, Coroutine
4+
from typing import Any, ParamSpec, TypeVar
5+
6+
from .background_task import periodic
7+
from .redis import RedisClientSDK, exclusive
8+
9+
P = ParamSpec("P")
10+
R = TypeVar("R")
11+
12+
13+
def exclusive_periodic(
14+
redis_client: RedisClientSDK | Callable[..., RedisClientSDK],
15+
*,
16+
task_interval: datetime.timedelta,
17+
retry_after: datetime.timedelta = datetime.timedelta(seconds=1),
18+
) -> Callable[
19+
[Callable[P, Coroutine[Any, Any, None]]], Callable[P, Coroutine[Any, Any, None]]
20+
]:
21+
"""decorates a function to become exclusive and periodic.
22+
23+
Arguments:
24+
client -- The Redis client
25+
task_interval -- the task periodicity
26+
retry_after -- in case the exclusive lock cannot be acquired or is lost, this is the retry interval
27+
28+
Returns:
29+
Nothing, a periodic method does not return anything as it runs forever.
30+
"""
31+
32+
def _decorator(
33+
func: Callable[P, Coroutine[Any, Any, None]],
34+
) -> Callable[P, Coroutine[Any, Any, None]]:
35+
@periodic(interval=retry_after)
36+
@exclusive(
37+
redis_client,
38+
lock_key=f"lock:exclusive_periodic_task:{func.__name__}",
39+
)
40+
@periodic(interval=task_interval)
41+
@functools.wraps(func)
42+
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
43+
return await func(*args, **kwargs)
44+
45+
return _wrapper
46+
47+
return _decorator

packages/service-library/src/servicelib/redis/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from ._client import RedisClientSDK
22
from ._clients_manager import RedisClientsManager
33
from ._decorators import exclusive
4-
from ._distributed_locks_utils import create_exclusive_periodic_task
54
from ._errors import (
65
CouldNotAcquireLockError,
76
CouldNotConnectToRedisError,
@@ -19,7 +18,6 @@
1918
"RedisClientSDK",
2019
"RedisClientsManager",
2120
"RedisManagerDBConfig",
22-
"create_exclusive_periodic_task",
2321
)
2422

2523
# nopycln: file

packages/service-library/src/servicelib/redis/_decorators.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
import contextlib
33
import functools
44
import logging
5+
import socket
56
from collections.abc import Callable, Coroutine
67
from datetime import timedelta
78
from typing import Any, Final, ParamSpec, TypeVar
89

10+
import arrow
911
import redis.exceptions
1012
from redis.asyncio.lock import Lock
1113

@@ -77,6 +79,9 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
7779
else redis_client
7880
)
7981
assert isinstance(client, RedisClientSDK) # nosec
82+
nonlocal lock_value
83+
if lock_value is None:
84+
lock_value = f"locked since {arrow.utcnow().format()} by {client.client_name} on {socket.gethostname()}"
8085

8186
lock = client.create_lock(redis_lock_key, ttl=DEFAULT_LOCK_TTL)
8287
if not await lock.acquire(

packages/service-library/src/servicelib/redis/_distributed_locks_utils.py

Lines changed: 0 additions & 49 deletions
This file was deleted.

packages/service-library/tests/test_background_task.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@
1717
from servicelib.async_utils import cancel_wait_task
1818
from servicelib.background_task import create_periodic_task, periodic, periodic_task
1919

20+
pytest_simcore_core_services_selection = [
21+
"redis",
22+
]
23+
pytest_simcore_ops_services_selection = [
24+
"redis-commander",
25+
]
26+
27+
2028
_FAST_POLL_INTERVAL: Final[int] = 1
2129
_VERY_SLOW_POLL_INTERVAL: Final[int] = 100
2230

packages/service-library/tests/redis/test_distributed_locks_utils.py renamed to packages/service-library/tests/test_background_task_utils.py

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,22 @@
1-
# pylint:disable=unused-variable
2-
# pylint:disable=unused-argument
3-
# pylint:disable=redefined-outer-name
4-
# pylint:disable=protected-access
1+
# pylint: disable=no-value-for-parameter
2+
# pylint: disable=redefined-outer-name
3+
# pylint: disable=unused-argument
4+
# pylint: disable=unused-variable
55

66

77
import asyncio
8-
from datetime import timedelta
8+
import datetime
9+
from collections.abc import AsyncIterator, Callable
10+
from contextlib import AbstractAsyncContextManager
911
from itertools import chain
10-
from unittest.mock import Mock
12+
from unittest import mock
1113

1214
import arrow
15+
import pytest
1316
from servicelib.async_utils import cancel_wait_task
14-
from servicelib.redis._client import RedisClientSDK
15-
from servicelib.redis._distributed_locks_utils import create_exclusive_periodic_task
16-
from servicelib.utils import logged_gather
17+
from servicelib.background_task_utils import exclusive_periodic
18+
from servicelib.redis import RedisClientSDK
19+
from settings_library.redis import RedisDatabase
1720
from tenacity import (
1821
AsyncRetrying,
1922
retry_if_exception_type,
@@ -29,14 +32,17 @@
2932
]
3033

3134

32-
async def _sleep_task(sleep_interval: float, on_sleep_events: Mock) -> None:
33-
on_sleep_events(arrow.utcnow())
34-
await asyncio.sleep(sleep_interval)
35-
print("Slept for", sleep_interval)
36-
on_sleep_events(arrow.utcnow())
35+
@pytest.fixture
36+
async def redis_client_sdk(
37+
get_redis_client_sdk: Callable[
38+
[RedisDatabase], AbstractAsyncContextManager[RedisClientSDK]
39+
],
40+
) -> AsyncIterator[RedisClientSDK]:
41+
async with get_redis_client_sdk(RedisDatabase.RESOURCES) as client:
42+
yield client
3743

3844

39-
async def _assert_on_sleep_done(on_sleep_events: Mock, *, stop_after: float):
45+
async def _assert_on_sleep_done(on_sleep_events: mock.Mock, *, stop_after: float):
4046
async for attempt in AsyncRetrying(
4147
wait=wait_fixed(0.1),
4248
stop=stop_after_delay(stop_after),
@@ -52,20 +58,20 @@ async def _assert_task_completes_once(
5258
redis_client_sdk: RedisClientSDK,
5359
stop_after: float,
5460
) -> tuple[float, ...]:
55-
sleep_events = Mock()
56-
57-
started_task = create_exclusive_periodic_task(
58-
redis_client_sdk,
59-
_sleep_task,
60-
task_period=timedelta(seconds=1),
61-
task_name="pytest_sleep_task",
62-
sleep_interval=1,
63-
on_sleep_events=sleep_events,
64-
)
61+
@exclusive_periodic(redis_client_sdk, task_interval=datetime.timedelta(seconds=1))
62+
async def _sleep_task(sleep_interval: float, on_sleep_events: mock.Mock) -> None:
63+
on_sleep_events(arrow.utcnow())
64+
await asyncio.sleep(sleep_interval)
65+
print("Slept for", sleep_interval)
66+
on_sleep_events(arrow.utcnow())
67+
68+
sleep_events = mock.Mock()
69+
70+
task = asyncio.create_task(_sleep_task(1, sleep_events), name="pytest_sleep_task")
6571

6672
await _assert_on_sleep_done(sleep_events, stop_after=stop_after)
6773

68-
await cancel_wait_task(started_task, max_delay=5)
74+
await cancel_wait_task(task, max_delay=5)
6975

7076
events_timestamps: tuple[float, ...] = tuple(
7177
x.args[0].timestamp() for x in sleep_events.call_args_list
@@ -86,33 +92,33 @@ def test__check_elements_lower():
8692
assert not _check_elements_lower([1, 2, 4, 3, 5])
8793

8894

89-
async def test_create_exclusive_periodic_task_single(
95+
async def test_exclusive_periodic_decorator_single(
9096
redis_client_sdk: RedisClientSDK,
9197
):
9298
await _assert_task_completes_once(redis_client_sdk, stop_after=2)
9399

94100

95-
async def test_create_exclusive_periodic_task_parallel_all_finish(
101+
async def test_exclusive_periodic_decorator_parallel_all_finish(
96102
redis_client_sdk: RedisClientSDK,
97103
):
98104
parallel_tasks = 10
99-
results: list[tuple[float, float]] = await logged_gather(
105+
results = await asyncio.gather(
100106
*[
101107
_assert_task_completes_once(redis_client_sdk, stop_after=60)
102108
for _ in range(parallel_tasks)
103109
],
104-
reraise=False,
110+
return_exceptions=True,
105111
)
106112

107113
# check no error occurred
108114
assert [isinstance(x, tuple) for x in results].count(True) == parallel_tasks
109-
assert [x[0] < x[1] for x in results].count(True) == parallel_tasks
115+
assert [isinstance(x, Exception) for x in results].count(True) == 0
116+
valid_results = [x for x in results if isinstance(x, tuple)]
117+
assert [x[0] < x[1] for x in valid_results].count(True) == parallel_tasks
110118

111119
# sort by start time (task start order is not equal to the task lock acquisition order)
112-
sorted_results: list[tuple[float, float]] = sorted(results, key=lambda x: x[0])
113-
114-
# pylint:disable=unnecessary-comprehension
115-
flattened_results: list[float] = [x for x in chain(*sorted_results)] # noqa: C416
120+
sorted_results = sorted(valid_results, key=lambda x: x[0])
121+
flattened_results = list(chain(*sorted_results))
116122

117123
# NOTE all entries should be in increasing order;
118124
# this means that the `_sleep_task` ran sequentially

0 commit comments

Comments
 (0)