Skip to content

Commit 5d60b6a

Browse files
author
Andrei Neagu
committed
fixed tests and spedup
1 parent a77d79d commit 5d60b6a

File tree

2 files changed

+285
-5
lines changed

2 files changed

+285
-5
lines changed

packages/service-library/tests/long_running_tasks/conftest.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# pylint: disable=redefined-outer-name
2+
# pylint: disable=unused-argument
13
import logging
24
from collections.abc import AsyncIterator, Awaitable, Callable
35
from datetime import timedelta
@@ -17,15 +19,21 @@
1719

1820

1921
@pytest.fixture
20-
async def get_tasks_manager(
21-
faker: Faker, mocker: MockerFixture
22-
) -> AsyncIterator[
23-
Callable[[RedisSettings, RedisNamespace | None], Awaitable[TasksManager]]
24-
]:
22+
async def mock_cancel_tasks_check_interval(
23+
mocker: MockerFixture,
24+
) -> None:
2525
mocker.patch(
2626
"servicelib.long_running_tasks.task._CANCEL_TASKS_CHECK_INTERVAL",
2727
new=timedelta(seconds=TEST_CHECK_STALE_INTERVAL_S),
2828
)
29+
30+
31+
@pytest.fixture
32+
async def get_tasks_manager(
33+
mock_cancel_tasks_check_interval: None, faker: Faker
34+
) -> AsyncIterator[
35+
Callable[[RedisSettings, RedisNamespace | None], Awaitable[TasksManager]]
36+
]:
2937
managers: list[TasksManager] = []
3038

3139
async def _(
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
# pylint: disable=protected-access
2+
# pylint: disable=redefined-outer-name
3+
# pylint: disable=unused-argument
4+
5+
import asyncio
6+
import secrets
7+
from collections.abc import Awaitable, Callable
8+
from typing import Any, Final
9+
10+
import pytest
11+
from models_library.api_schemas_long_running_tasks.base import TaskProgress
12+
from pydantic import NonNegativeInt
13+
from pytest_mock import MockerFixture
14+
from servicelib.long_running_tasks import lrt_api
15+
from servicelib.long_running_tasks.errors import TaskNotFoundError
16+
from servicelib.long_running_tasks.models import TaskContext
17+
from servicelib.long_running_tasks.task import (
18+
RedisNamespace,
19+
TaskId,
20+
TaskRegistry,
21+
TasksManager,
22+
)
23+
from settings_library.redis import RedisSettings
24+
from tenacity import (
25+
AsyncRetrying,
26+
TryAgain,
27+
retry_if_exception_type,
28+
stop_after_delay,
29+
wait_fixed,
30+
)
31+
32+
pytest_simcore_core_services_selection = [
33+
"redis", # TODO: remove when done with this part
34+
]
35+
pytest_simcore_ops_services_selection = [
36+
"redis-commander",
37+
]
38+
39+
_RETRY_PARAMS: dict[str, Any] = {
40+
"reraise": True,
41+
"wait": wait_fixed(0.1),
42+
"stop": stop_after_delay(60),
43+
"retry": retry_if_exception_type((AssertionError, TryAgain)),
44+
}
45+
46+
47+
async def _task_echo_input(progress: TaskProgress, to_return: Any) -> Any:
48+
return to_return
49+
50+
51+
async def _task_always_raise(progress: TaskProgress) -> None:
52+
msg = "This task always raises an error"
53+
raise RuntimeError(msg)
54+
55+
56+
async def _task_takes_too_long(progress: TaskProgress) -> None:
57+
# Simulate a long-running task that is taking too much time
58+
await asyncio.sleep(1e9)
59+
60+
61+
TaskRegistry.register(_task_echo_input)
62+
TaskRegistry.register(_task_always_raise)
63+
TaskRegistry.register(_task_takes_too_long)
64+
65+
66+
@pytest.fixture
67+
def managers_count() -> NonNegativeInt:
68+
return 5
69+
70+
71+
@pytest.fixture
72+
def disable_stale_tasks_monitor(mocker: MockerFixture) -> None:
73+
# no need to autoremove stale tasks in these tests
74+
async def _to_replace(self: TasksManager) -> None:
75+
self._started_event_task_stale_tasks_monitor.set()
76+
77+
mocker.patch.object(
78+
TasksManager,
79+
"_stale_tasks_monitor",
80+
_to_replace,
81+
)
82+
83+
84+
@pytest.fixture
85+
async def tasks_managers(
86+
disable_stale_tasks_monitor: None,
87+
managers_count: NonNegativeInt,
88+
redis_service: RedisSettings,
89+
get_tasks_manager: Callable[
90+
[RedisSettings, RedisNamespace | None], Awaitable[TasksManager]
91+
],
92+
) -> list[TasksManager]:
93+
maanagers: list[TasksManager] = []
94+
for _ in range(managers_count):
95+
manager = await get_tasks_manager(redis_service, "same-service")
96+
maanagers.append(manager)
97+
98+
return maanagers
99+
100+
101+
def _get_task_manager(tasks_managers: list[TasksManager]) -> TasksManager:
102+
return secrets.choice(tasks_managers)
103+
104+
105+
async def _assert_task_status(
106+
task_manager: TasksManager, task_id: TaskId, *, is_done: bool
107+
) -> None:
108+
result = await lrt_api.get_task_status(task_manager, TaskContext(), task_id)
109+
assert result.done is is_done
110+
111+
112+
async def _assert_task_status_on_random_manager(
113+
tasks_managers: list[TasksManager], task_ids: list[TaskId], *, is_done: bool = True
114+
) -> None:
115+
for task_id in task_ids:
116+
result = await lrt_api.get_task_status(
117+
_get_task_manager(tasks_managers), TaskContext(), task_id
118+
)
119+
assert result.done is is_done
120+
121+
122+
async def _assert_task_status_done_on_all_managers(
123+
tasks_managers: list[TasksManager], task_id: TaskId, *, is_done: bool = True
124+
) -> None:
125+
async for attempt in AsyncRetrying(**_RETRY_PARAMS):
126+
with attempt:
127+
await _assert_task_status(
128+
_get_task_manager(tasks_managers), task_id, is_done=is_done
129+
)
130+
131+
# check can do this form any task manager
132+
for manager in tasks_managers:
133+
await _assert_task_status(manager, task_id, is_done=is_done)
134+
135+
136+
async def _assert_list_tasks_from_all_managers(
137+
tasks_managers: list[TasksManager], task_context: TaskContext, task_count: int
138+
) -> None:
139+
for manager in tasks_managers:
140+
tasks = await lrt_api.list_tasks(manager, task_context)
141+
assert len(tasks) == task_count
142+
143+
144+
async def _assert_task_is_no_longer_present(
145+
tasks_managers: list[TasksManager], task_context: TaskContext, task_id: TaskId
146+
) -> None:
147+
with pytest.raises(TaskNotFoundError):
148+
await lrt_api.get_task_status(
149+
_get_task_manager(tasks_managers), task_context, task_id
150+
)
151+
152+
153+
_TASK_CONTEXT: Final[list[TaskContext | None]] = [{"a": "context"}, None]
154+
_IS_UNIQUE: Final[list[bool]] = [False, True]
155+
_TASK_COUNT: Final[list[int]] = [5]
156+
157+
158+
@pytest.mark.parametrize("task_count", _TASK_COUNT)
159+
@pytest.mark.parametrize("task_context", _TASK_CONTEXT)
160+
@pytest.mark.parametrize("is_unique", _IS_UNIQUE)
161+
@pytest.mark.parametrize("to_return", [{"key": "value"}])
162+
async def test_workflow_with_result(
163+
tasks_managers: list[TasksManager],
164+
task_count: int,
165+
is_unique: bool,
166+
task_context: TaskContext | None,
167+
to_return: Any,
168+
):
169+
saved_context = task_context or {}
170+
task_count = 1 if is_unique else task_count
171+
172+
task_ids: list[TaskId] = []
173+
for _ in range(task_count):
174+
task_id = await lrt_api.start_task(
175+
_get_task_manager(tasks_managers),
176+
_task_echo_input.__name__,
177+
unique=is_unique,
178+
task_name=None,
179+
task_context=task_context,
180+
fire_and_forget=False,
181+
to_return=to_return,
182+
)
183+
task_ids.append(task_id)
184+
185+
for task_id in task_ids:
186+
await _assert_task_status_done_on_all_managers(tasks_managers, task_id)
187+
188+
await _assert_list_tasks_from_all_managers(
189+
tasks_managers, saved_context, task_count=task_count
190+
)
191+
192+
# avoids tasks getting garbage collected
193+
await _assert_task_status_on_random_manager(tasks_managers, task_ids, is_done=True)
194+
195+
for task_id in task_ids:
196+
result = await lrt_api.get_task_result(
197+
_get_task_manager(tasks_managers), saved_context, task_id
198+
)
199+
assert result == to_return
200+
201+
await _assert_task_is_no_longer_present(tasks_managers, saved_context, task_id)
202+
203+
204+
@pytest.mark.parametrize("task_count", _TASK_COUNT)
205+
@pytest.mark.parametrize("task_context", _TASK_CONTEXT)
206+
@pytest.mark.parametrize("is_unique", _IS_UNIQUE)
207+
async def test_workflow_raises_error(
208+
tasks_managers: list[TasksManager],
209+
task_count: int,
210+
is_unique: bool,
211+
task_context: TaskContext | None,
212+
):
213+
saved_context = task_context or {}
214+
task_count = 1 if is_unique else task_count
215+
216+
task_ids: list[TaskId] = []
217+
for _ in range(task_count):
218+
task_id = await lrt_api.start_task(
219+
_get_task_manager(tasks_managers),
220+
_task_always_raise.__name__,
221+
unique=is_unique,
222+
task_name=None,
223+
task_context=task_context,
224+
fire_and_forget=False,
225+
)
226+
task_ids.append(task_id)
227+
228+
for task_id in task_ids:
229+
await _assert_task_status_done_on_all_managers(tasks_managers, task_id)
230+
231+
await _assert_list_tasks_from_all_managers(
232+
tasks_managers, saved_context, task_count=task_count
233+
)
234+
235+
# avoids tasks getting garbage collected
236+
await _assert_task_status_on_random_manager(tasks_managers, task_ids, is_done=True)
237+
238+
for task_id in task_ids:
239+
with pytest.raises(RuntimeError, match="This task always raises an error"):
240+
await lrt_api.get_task_result(
241+
_get_task_manager(tasks_managers), saved_context, task_id
242+
)
243+
244+
await _assert_task_is_no_longer_present(tasks_managers, saved_context, task_id)
245+
246+
247+
@pytest.mark.parametrize("task_count", _TASK_COUNT)
248+
@pytest.mark.parametrize("task_context", _TASK_CONTEXT)
249+
@pytest.mark.parametrize("is_unique", _IS_UNIQUE)
250+
async def test_remove_task(
251+
tasks_managers: list[TasksManager],
252+
task_count: int,
253+
is_unique: bool,
254+
task_context: TaskContext | None,
255+
):
256+
task_id = await lrt_api.start_task(
257+
_get_task_manager(tasks_managers),
258+
_task_takes_too_long.__name__,
259+
unique=is_unique,
260+
task_name=None,
261+
task_context=task_context,
262+
fire_and_forget=False,
263+
)
264+
saved_context = task_context or {}
265+
266+
await _assert_task_status_done_on_all_managers(
267+
tasks_managers, task_id, is_done=False
268+
)
269+
270+
await lrt_api.remove_task(_get_task_manager(tasks_managers), saved_context, task_id)
271+
272+
await _assert_task_is_no_longer_present(tasks_managers, saved_context, task_id)

0 commit comments

Comments
 (0)