Skip to content

Commit 65ddf2d

Browse files
authored
Merge branch 'master' into pr-osparc-fix-issue-removing-volume
2 parents 512b086 + a63e747 commit 65ddf2d

File tree

73 files changed

+1386
-1276
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+1386
-1276
lines changed

packages/pytest-simcore/src/pytest_simcore/rabbit_service.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import asyncio
77
import logging
88
from collections.abc import AsyncIterator, Awaitable, Callable
9-
from contextlib import suppress
109

1110
import aio_pika
1211
import pytest
@@ -141,12 +140,11 @@ async def ensure_parametrized_queue_is_empty(
141140
rabbitmq_client = create_rabbitmq_client("pytest-purger")
142141

143142
async def _queue_messages_purger() -> None:
144-
with suppress(aio_pika.exceptions.ChannelClosed):
145-
assert rabbitmq_client._channel_pool # noqa: SLF001
146-
async with rabbitmq_client._channel_pool.acquire() as channel: # noqa: SLF001
147-
assert isinstance(channel, aio_pika.RobustChannel)
148-
queue = await channel.get_queue(queue_name)
149-
await queue.purge()
143+
assert rabbitmq_client._channel_pool # noqa: SLF001
144+
async with rabbitmq_client._channel_pool.acquire() as channel: # noqa: SLF001
145+
assert isinstance(channel, aio_pika.RobustChannel)
146+
queue = await channel.get_queue(queue_name)
147+
await queue.purge()
150148

151149
await _queue_messages_purger()
152150
yield

packages/pytest-simcore/src/pytest_simcore/redis_service.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import tenacity
1111
from pytest_mock import MockerFixture
1212
from redis.asyncio import Redis, from_url
13-
from servicelib.redis import _constants as redis_constants
1413
from settings_library.basic_types import PortInt
1514
from settings_library.redis import RedisDatabase, RedisSettings
1615
from tenacity.before_sleep import before_sleep_log
@@ -119,4 +118,6 @@ async def wait_till_redis_responsive(redis_url: URL | str) -> None:
119118
@pytest.fixture
120119
def mock_redis_socket_timeout(mocker: MockerFixture) -> None:
121120
# lowered to allow CI to properly shutdown RedisClientSDK instances
122-
mocker.patch.object(redis_constants, "DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=1))
121+
mocker.patch(
122+
"servicelib.redis._client.DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=0.25)
123+
)

packages/service-library/src/servicelib/async_utils.py

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
import asyncio
2+
import contextlib
3+
import datetime
24
import logging
35
from collections import deque
6+
from collections.abc import Awaitable, Callable, Coroutine
47
from contextlib import suppress
58
from dataclasses import dataclass
69
from functools import wraps
7-
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Deque
10+
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar
811

912
from . import tracing
1013
from .utils_profiling_middleware import dont_profile, is_profiling, profile_context
1114

12-
logger = logging.getLogger(__name__)
15+
_logger = logging.getLogger(__name__)
16+
17+
P = ParamSpec("P")
18+
R = TypeVar("R")
19+
1320

1421
if TYPE_CHECKING:
1522
Queue = asyncio.Queue
@@ -54,7 +61,7 @@ async def _safe_cancel(context: Context) -> None:
5461
await context.task
5562
except RuntimeError as e:
5663
if "Event loop is closed" in f"{e}":
57-
logger.warning("event loop is closed and could not cancel %s", context)
64+
_logger.warning("event loop is closed and could not cancel %s", context)
5865
else:
5966
raise
6067

@@ -65,7 +72,7 @@ async def cancel_sequential_workers() -> None:
6572
await _safe_cancel(context)
6673

6774
_sequential_jobs_contexts.clear()
68-
logger.info("All run_sequentially_in_context pending workers stopped")
75+
_logger.info("All run_sequentially_in_context pending workers stopped")
6976

7077

7178
# NOTE: If you get funny mismatches with mypy in returned values it might be due to this decorator.
@@ -118,25 +125,25 @@ def _get_context(args: Any, kwargs: dict) -> Context:
118125
arg_names = decorated_function.__code__.co_varnames[
119126
: decorated_function.__code__.co_argcount
120127
]
121-
search_args = dict(zip(arg_names, args))
128+
search_args = dict(zip(arg_names, args, strict=False))
122129
search_args.update(kwargs)
123130

124-
key_parts: Deque[str] = deque()
131+
key_parts: deque[str] = deque()
125132
for arg in target_args:
126133
sub_args = arg.split(".")
127134
main_arg = sub_args[0]
128135
if main_arg not in search_args:
129-
raise ValueError(
136+
msg = (
130137
f"Expected '{main_arg}' in '{decorated_function.__name__}'"
131138
f" arguments. Got '{search_args}'"
132139
)
140+
raise ValueError(msg)
133141
context_key = search_args[main_arg]
134142
for attribute in sub_args[1:]:
135143
potential_key = getattr(context_key, attribute)
136144
if not potential_key:
137-
raise ValueError(
138-
f"Expected '{attribute}' attribute in '{context_key.__name__}' arguments."
139-
)
145+
msg = f"Expected '{attribute}' attribute in '{context_key.__name__}' arguments."
146+
raise ValueError(msg)
140147
context_key = potential_key
141148

142149
key_parts.append(f"{decorated_function.__name__}_{context_key}")
@@ -205,3 +212,40 @@ async def worker(in_q: Queue[QueueElement], out_q: Queue) -> None:
205212
return wrapper
206213

207214
return decorator
215+
216+
217+
def delayed_start(
218+
delay: datetime.timedelta,
219+
) -> Callable[
220+
[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]
221+
]:
222+
def _decorator(
223+
func: Callable[P, Coroutine[Any, Any, R]],
224+
) -> Callable[P, Coroutine[Any, Any, R]]:
225+
@wraps(func)
226+
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
227+
await asyncio.sleep(delay.total_seconds())
228+
return await func(*args, **kwargs)
229+
230+
return _wrapper
231+
232+
return _decorator
233+
234+
235+
async def cancel_wait_task(
236+
task: asyncio.Task,
237+
*,
238+
max_delay: float | None = None,
239+
) -> None:
240+
"""Cancel a asyncio.Task and waits for it to finish.
241+
242+
:param task: task to be canceled
243+
:param max_delay: duration (in seconds) to wait before giving
244+
up the cancellation. If None it waits forever.
245+
:raises TimeoutError: raised if cannot cancel the task.
246+
"""
247+
248+
task.cancel()
249+
async with asyncio.timeout(max_delay):
250+
with contextlib.suppress(asyncio.CancelledError):
251+
await task
Lines changed: 80 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,136 +1,115 @@
11
import asyncio
22
import contextlib
33
import datetime
4+
import functools
45
import logging
5-
from collections.abc import AsyncIterator, Awaitable, Callable
6-
from typing import Final
6+
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
7+
from typing import Any, Final, ParamSpec, TypeVar
78

8-
from common_library.errors_classes import OsparcErrorMixin
9-
from tenacity import TryAgain
10-
from tenacity.asyncio import AsyncRetrying
11-
from tenacity.stop import stop_after_attempt
9+
from tenacity import TryAgain, before_sleep_log, retry, retry_if_exception_type
1210
from tenacity.wait import wait_fixed
1311

14-
from .decorators import async_delayed
15-
from .logging_utils import log_catch, log_context
12+
from .async_utils import cancel_wait_task, delayed_start
13+
from .logging_utils import log_context
1614

1715
_logger = logging.getLogger(__name__)
1816

1917

2018
_DEFAULT_STOP_TIMEOUT_S: Final[int] = 5
21-
_MAX_TASK_CANCELLATION_ATTEMPTS: Final[int] = 3
22-
23-
24-
class PeriodicTaskCancellationError(OsparcErrorMixin, Exception):
25-
msg_template: str = "Could not cancel task '{task_name}'"
2619

2720

2821
class SleepUsingAsyncioEvent:
29-
"""Sleep strategy that waits on an event to be set."""
22+
"""Sleep strategy that waits on an event to be set or sleeps."""
3023

3124
def __init__(self, event: "asyncio.Event") -> None:
3225
self.event = event
3326

34-
async def __call__(self, timeout: float | None) -> None:
27+
async def __call__(self, delay: float | None) -> None:
3528
with contextlib.suppress(TimeoutError):
36-
await asyncio.wait_for(self.event.wait(), timeout=timeout)
29+
await asyncio.wait_for(self.event.wait(), timeout=delay)
3730
self.event.clear()
3831

3932

40-
async def _periodic_scheduled_task(
41-
task: Callable[..., Awaitable[None]],
33+
P = ParamSpec("P")
34+
R = TypeVar("R")
35+
36+
37+
def periodic(
4238
*,
4339
interval: datetime.timedelta,
44-
task_name: str,
45-
early_wake_up_event: asyncio.Event | None,
46-
**task_kwargs,
47-
) -> None:
48-
# NOTE: This retries forever unless cancelled
49-
nap = (
50-
asyncio.sleep
51-
if early_wake_up_event is None
52-
else SleepUsingAsyncioEvent(early_wake_up_event)
53-
)
54-
async for attempt in AsyncRetrying(
55-
sleep=nap,
56-
wait=wait_fixed(interval.total_seconds()),
57-
):
58-
with attempt:
59-
with log_context(
60-
_logger,
61-
logging.DEBUG,
62-
msg=f"iteration {attempt.retry_state.attempt_number} of '{task_name}'",
63-
), log_catch(_logger):
64-
await task(**task_kwargs)
40+
raise_on_error: bool = False,
41+
early_wake_up_event: asyncio.Event | None = None,
42+
) -> Callable[
43+
[Callable[P, Coroutine[Any, Any, None]]], Callable[P, Coroutine[Any, Any, None]]
44+
]:
45+
"""Calls the function periodically with a given interval.
46+
47+
Arguments:
48+
interval -- the interval between calls
49+
50+
Keyword Arguments:
51+
raise_on_error -- If False the function will be retried indefinitely unless cancelled.
52+
If True the function will be retried indefinitely unless cancelled
53+
or an exception is raised. (default: {False})
54+
early_wake_up_event -- allows to awaken the function before the interval has passed. (default: {None})
55+
56+
Returns:
57+
coroutine that will be called periodically (runs forever)
58+
"""
6559

60+
def _decorator(
61+
func: Callable[P, Coroutine[Any, Any, None]],
62+
) -> Callable[P, Coroutine[Any, Any, None]]:
63+
nap = (
64+
asyncio.sleep
65+
if early_wake_up_event is None
66+
else SleepUsingAsyncioEvent(early_wake_up_event)
67+
)
68+
69+
@retry(
70+
sleep=nap,
71+
wait=wait_fixed(interval.total_seconds()),
72+
reraise=True,
73+
retry=(
74+
retry_if_exception_type(TryAgain)
75+
if raise_on_error
76+
else retry_if_exception_type()
77+
),
78+
before_sleep=before_sleep_log(_logger, logging.DEBUG),
79+
)
80+
@functools.wraps(func)
81+
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
82+
await func(*args, **kwargs)
6683
raise TryAgain
6784

85+
return _wrapper
6886

69-
def start_periodic_task(
87+
return _decorator
88+
89+
90+
def create_periodic_task(
7091
task: Callable[..., Awaitable[None]],
7192
*,
7293
interval: datetime.timedelta,
7394
task_name: str,
95+
raise_on_error: bool = False,
7496
wait_before_running: datetime.timedelta = datetime.timedelta(0),
7597
early_wake_up_event: asyncio.Event | None = None,
7698
**kwargs,
7799
) -> asyncio.Task:
78-
with log_context(
79-
_logger, logging.DEBUG, msg=f"create periodic background task '{task_name}'"
80-
):
81-
delayed_periodic_scheduled_task = async_delayed(wait_before_running)(
82-
_periodic_scheduled_task
83-
)
84-
return asyncio.create_task(
85-
delayed_periodic_scheduled_task(
86-
task,
87-
interval=interval,
88-
task_name=task_name,
89-
early_wake_up_event=early_wake_up_event,
90-
**kwargs,
91-
),
92-
name=task_name,
93-
)
94-
100+
@delayed_start(wait_before_running)
101+
@periodic(
102+
interval=interval,
103+
raise_on_error=raise_on_error,
104+
early_wake_up_event=early_wake_up_event,
105+
)
106+
async def _() -> None:
107+
await task(**kwargs)
95108

96-
async def cancel_task(
97-
task: asyncio.Task,
98-
*,
99-
timeout: float | None,
100-
cancellation_attempts: int = _MAX_TASK_CANCELLATION_ATTEMPTS,
101-
) -> None:
102-
"""Reliable task cancellation. Some libraries will just hang without
103-
cancelling the task. It is important to retry the operation to provide
104-
a timeout in that situation to avoid forever pending tasks.
105-
106-
:param task: task to be canceled
107-
:param timeout: total duration (in seconds) to wait before giving
108-
up the cancellation. If None it waits forever.
109-
:raises TryAgain: raised if cannot cancel the task.
110-
"""
111-
async for attempt in AsyncRetrying(
112-
stop=stop_after_attempt(cancellation_attempts), reraise=True
113-
):
114-
with attempt:
115-
task.cancel()
116-
_, pending = await asyncio.wait((task,), timeout=timeout)
117-
if pending:
118-
task_name = task.get_name()
119-
_logger.info(
120-
"tried to cancel '%s' but timed-out! %s", task_name, pending
121-
)
122-
raise PeriodicTaskCancellationError(task_name=task_name)
123-
124-
125-
async def stop_periodic_task(
126-
asyncio_task: asyncio.Task, *, timeout: float | None = None
127-
) -> None:
128109
with log_context(
129-
_logger,
130-
logging.DEBUG,
131-
msg=f"cancel periodic background task '{asyncio_task.get_name()}'",
110+
_logger, logging.DEBUG, msg=f"create periodic background task '{task_name}'"
132111
):
133-
await cancel_task(asyncio_task, timeout=timeout)
112+
return asyncio.create_task(_(), name=task_name)
134113

135114

136115
@contextlib.asynccontextmanager
@@ -140,16 +119,21 @@ async def periodic_task(
140119
interval: datetime.timedelta,
141120
task_name: str,
142121
stop_timeout: float = _DEFAULT_STOP_TIMEOUT_S,
122+
raise_on_error: bool = False,
143123
**kwargs,
144124
) -> AsyncIterator[asyncio.Task]:
145125
asyncio_task: asyncio.Task | None = None
146126
try:
147-
asyncio_task = start_periodic_task(
148-
task, interval=interval, task_name=task_name, **kwargs
127+
asyncio_task = create_periodic_task(
128+
task,
129+
interval=interval,
130+
task_name=task_name,
131+
raise_on_error=raise_on_error,
132+
**kwargs,
149133
)
150134
yield asyncio_task
151135
finally:
152136
if asyncio_task is not None:
153137
# NOTE: this stopping is shielded to prevent the cancellation to propagate
154138
# into the stopping procedure
155-
await asyncio.shield(stop_periodic_task(asyncio_task, timeout=stop_timeout))
139+
await asyncio.shield(cancel_wait_task(asyncio_task, max_delay=stop_timeout))

0 commit comments

Comments
 (0)