Skip to content

Commit ae2f4f1

Browse files
author
Andrei Neagu
committed
fixed most tests
1 parent 7f24c65 commit ae2f4f1

File tree

7 files changed

+101
-20
lines changed

7 files changed

+101
-20
lines changed

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_manager.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import datetime
22

33
from aiohttp import web
4+
from settings_library.rabbit import RabbitSettings
45
from settings_library.redis import RedisSettings
56

7+
from ...long_running_tasks import lrt_api
68
from ...long_running_tasks.base_long_running_manager import BaseLongRunningManager
7-
from ...long_running_tasks.models import TaskContext
9+
from ...long_running_tasks.models import RabbitNamespace, TaskContext
810
from ...long_running_tasks.task import RedisNamespace, TasksManager
11+
from ...rabbitmq._client_rpc import RabbitMQRPCClient
912
from ._constants import APP_LONG_RUNNING_MANAGER_KEY
1013
from ._request import get_task_context
1114

@@ -17,7 +20,9 @@ def __init__(
1720
stale_task_check_interval: datetime.timedelta,
1821
stale_task_detect_timeout: datetime.timedelta,
1922
redis_settings: RedisSettings,
23+
rabbit_settings: RabbitSettings,
2024
redis_namespace: RedisNamespace,
25+
rabbit_namespace: RabbitNamespace,
2126
):
2227
self._app = app
2328
self._tasks_manager = TasksManager(
@@ -26,17 +31,52 @@ def __init__(
2631
redis_settings=redis_settings,
2732
redis_namespace=redis_namespace,
2833
)
34+
self._rabbit_namespace = rabbit_namespace
35+
self.rabbit_settings = rabbit_settings
36+
self._rpc_server: RabbitMQRPCClient | None = None
37+
self._rpc_client: RabbitMQRPCClient | None = None
2938

3039
@property
3140
def tasks_manager(self) -> TasksManager:
3241
return self._tasks_manager
3342

43+
@property
44+
def rpc_server(self) -> RabbitMQRPCClient:
45+
assert self._rpc_server is not None # nosec
46+
return self._rpc_server
47+
48+
@property
49+
def rpc_client(self) -> RabbitMQRPCClient:
50+
assert self._rpc_client is not None # nosec
51+
return self._rpc_client
52+
53+
@property
54+
def rabbit_namespace(self) -> RabbitNamespace:
55+
return self._rabbit_namespace
56+
3457
async def setup(self) -> None:
3558
await self._tasks_manager.setup()
59+
self._rpc_server = await RabbitMQRPCClient.create(
60+
client_name=f"lrt-server-{self.rabbit_namespace}",
61+
settings=self.rabbit_settings,
62+
)
63+
self._rpc_client = await RabbitMQRPCClient.create(
64+
client_name=f"lrt-client-{self.rabbit_namespace}",
65+
settings=self.rabbit_settings,
66+
)
67+
await lrt_api.register_rabbit_routes(self)
3668

3769
async def teardown(self) -> None:
3870
await self._tasks_manager.teardown()
3971

72+
if self._rpc_server is not None:
73+
await self._rpc_server.close()
74+
self._rpc_server = None
75+
76+
if self._rpc_client is not None:
77+
await self._rpc_client.close()
78+
self._rpc_client = None
79+
4080
@staticmethod
4181
def get_task_context(request: web.Request) -> TaskContext:
4282
return get_task_context(request)

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_routes.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ async def list_tasks(request: web.Request) -> web.Response:
2929
abort_href=f"{request.app.router['remove_task'].url_for(task_id=t.task_id)}",
3030
)
3131
for t in await lrt_api.list_tasks(
32+
long_running_manager.rpc_client,
3233
long_running_manager,
3334
long_running_manager.get_task_context(request),
3435
)
@@ -42,6 +43,7 @@ async def get_task_status(request: web.Request) -> web.Response:
4243
long_running_manager = get_long_running_manager(request.app)
4344

4445
task_status = await lrt_api.get_task_status(
46+
long_running_manager.rpc_client,
4547
long_running_manager,
4648
long_running_manager.get_task_context(request),
4749
path_params.task_id,
@@ -56,6 +58,7 @@ async def get_task_result(request: web.Request) -> web.Response | Any:
5658

5759
# NOTE: this might raise an exception that will be catched by the _error_handlers
5860
return await lrt_api.get_task_result(
61+
long_running_manager.rpc_client,
5962
long_running_manager,
6063
long_running_manager.get_task_context(request),
6164
path_params.task_id,
@@ -68,6 +71,7 @@ async def remove_task(request: web.Request) -> web.Response:
6871
long_running_manager = get_long_running_manager(request.app)
6972

7073
await lrt_api.remove_task(
74+
long_running_manager.rpc_client,
7175
long_running_manager,
7276
long_running_manager.get_task_context(request),
7377
path_params.task_id,

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from aiohttp.web import HTTPException
99
from common_library.json_serialization import json_dumps
1010
from pydantic import AnyHttpUrl, TypeAdapter
11+
from settings_library.rabbit import RabbitSettings
1112
from settings_library.redis import RedisSettings
1213

1314
from ...aiohttp import status
@@ -20,7 +21,7 @@
2021
DEFAULT_STALE_TASK_CHECK_INTERVAL,
2122
DEFAULT_STALE_TASK_DETECT_TIMEOUT,
2223
)
23-
from ...long_running_tasks.models import TaskContext, TaskGet
24+
from ...long_running_tasks.models import RabbitNamespace, TaskContext, TaskGet
2425
from ...long_running_tasks.task import RedisNamespace, RegisteredTaskName
2526
from ..typing_extension import Handler
2627
from . import _routes
@@ -63,6 +64,7 @@ async def start_long_running_task(
6364
task_id = None
6465
try:
6566
task_id = await lrt_api.start_task(
67+
long_running_manager.rpc_client,
6668
long_running_manager,
6769
registerd_task_name,
6870
fire_and_forget=fire_and_forget,
@@ -97,7 +99,12 @@ async def start_long_running_task(
9799
except asyncio.CancelledError:
98100
# remove the task, the client was disconnected
99101
if task_id:
100-
await lrt_api.remove_task(long_running_manager, task_context, task_id)
102+
await lrt_api.remove_task(
103+
long_running_manager.rpc_client,
104+
long_running_manager,
105+
task_context,
106+
task_id,
107+
)
101108
raise
102109

103110

@@ -142,6 +149,8 @@ def setup(
142149
router_prefix: str,
143150
redis_settings: RedisSettings,
144151
redis_namespace: RedisNamespace,
152+
rabbit_settings: RabbitSettings,
153+
rabbit_namespace: RabbitNamespace,
145154
handler_check_decorator: Callable = _no_ops_decorator,
146155
task_request_context_decorator: Callable = _no_task_context_decorator,
147156
stale_task_check_interval: datetime.timedelta = DEFAULT_STALE_TASK_CHECK_INTERVAL,
@@ -170,7 +179,9 @@ async def on_cleanup_ctx(app: web.Application) -> AsyncGenerator[None, None]:
170179
stale_task_check_interval=stale_task_check_interval,
171180
stale_task_detect_timeout=stale_task_detect_timeout,
172181
redis_settings=redis_settings,
182+
rabbit_settings=rabbit_settings,
173183
redis_namespace=redis_namespace,
184+
rabbit_namespace=rabbit_namespace,
174185
)
175186
)
176187

packages/service-library/tests/aiohttp/long_running_tasks/test_long_running_tasks.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,33 +23,34 @@
2323
from servicelib.aiohttp.rest_middlewares import append_rest_middlewares
2424
from servicelib.long_running_tasks.models import TaskGet, TaskId, TaskStatus
2525
from servicelib.long_running_tasks.task import TaskContext
26+
from settings_library.rabbit import RabbitSettings
2627
from settings_library.redis import RedisSettings
2728
from tenacity.asyncio import AsyncRetrying
2829
from tenacity.retry import retry_if_exception_type
2930
from tenacity.stop import stop_after_delay
3031
from tenacity.wait import wait_fixed
3132

3233
pytest_simcore_core_services_selection = [
33-
"redis",
34-
]
35-
36-
pytest_simcore_ops_services_selection = [
37-
"redis-commander",
34+
"rabbit",
3835
]
3936

4037

4138
@pytest.fixture
4239
def app(
43-
server_routes: web.RouteTableDef, redis_service: RedisSettings
40+
server_routes: web.RouteTableDef,
41+
use_in_memory_redis: RedisSettings,
42+
rabbit_service: RabbitSettings,
4443
) -> web.Application:
4544
app = web.Application()
4645
app.add_routes(server_routes)
4746
# this adds enveloping and error middlewares
4847
append_rest_middlewares(app, api_version="")
4948
long_running_tasks.server.setup(
5049
app,
51-
redis_settings=redis_service,
50+
redis_settings=use_in_memory_redis,
5251
redis_namespace="test",
52+
rabbit_settings=rabbit_service,
53+
rabbit_namespace="test",
5354
router_prefix="/futures",
5455
)
5556

@@ -109,7 +110,14 @@ async def test_workflow(
109110
("generated item", 0.8),
110111
("finished", 1.0),
111112
]
112-
assert all(x in progress_updates for x in EXPECTED_MESSAGES)
113+
async for attempt in AsyncRetrying(
114+
wait=wait_fixed(0.1),
115+
stop=stop_after_delay(10),
116+
reraise=True,
117+
retry=retry_if_exception_type(AssertionError),
118+
):
119+
with attempt:
120+
assert all(x in progress_updates for x in EXPECTED_MESSAGES)
113121
# now get the result
114122
result_url = client.app.router["get_task_result"].url_for(task_id=task_id)
115123
result = await client.get(f"{result_url}")

packages/service-library/tests/aiohttp/long_running_tasks/test_long_running_tasks_client.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,20 @@
1515
long_running_task_request,
1616
)
1717
from servicelib.aiohttp.rest_middlewares import append_rest_middlewares
18+
from settings_library.rabbit import RabbitSettings
1819
from settings_library.redis import RedisSettings
1920
from yarl import URL
2021

22+
pytest_simcore_core_services_selection = [
23+
"rabbit",
24+
]
25+
2126

2227
@pytest.fixture
2328
def app(
24-
server_routes: web.RouteTableDef, use_in_memory_redis: RedisSettings
29+
server_routes: web.RouteTableDef,
30+
use_in_memory_redis: RedisSettings,
31+
rabbit_service: RabbitSettings,
2532
) -> web.Application:
2633
app = web.Application()
2734
app.add_routes(server_routes)
@@ -31,6 +38,8 @@ def app(
3138
app,
3239
redis_settings=use_in_memory_redis,
3340
redis_namespace="test",
41+
rabbit_settings=rabbit_service,
42+
rabbit_namespace="test",
3443
router_prefix="/futures",
3544
)
3645

@@ -58,7 +67,7 @@ async def test_long_running_task_request_raises_400(
5867
client: TestClient, long_running_task_url: URL
5968
):
6069
# missing parameters raises
61-
with pytest.raises(ClientResponseError):
70+
with pytest.raises(ClientResponseError): # noqa: PT012
6271
async for _ in long_running_task_request(
6372
client.session, long_running_task_url, None
6473
):
@@ -95,7 +104,7 @@ async def test_long_running_task_request_timeout(
95104
):
96105
assert client.app
97106
task: LRTask | None = None
98-
with pytest.raises(asyncio.TimeoutError):
107+
with pytest.raises(asyncio.TimeoutError): # noqa: PT012
99108
async for task in long_running_task_request(
100109
client.session,
101110
long_running_task_url.with_query(num_strings=10, sleep_time=1),

packages/service-library/tests/aiohttp/long_running_tasks/test_long_running_tasks_with_task_context.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@
2727
from servicelib.aiohttp.typing_extension import Handler
2828
from servicelib.long_running_tasks.models import TaskGet, TaskId
2929
from servicelib.long_running_tasks.task import TaskContext
30+
from settings_library.rabbit import RabbitSettings
3031
from settings_library.redis import RedisSettings
3132

3233
pytest_simcore_core_services_selection = [
33-
"redis",
34+
"rabbit",
3435
]
3536
# WITH TASK CONTEXT
3637
# NOTE: as the long running task framework may be used in any number of services
@@ -67,16 +68,19 @@ async def _test_task_context_decorator(
6768
def app_with_task_context(
6869
server_routes: web.RouteTableDef,
6970
task_context_decorator,
70-
redis_service: RedisSettings,
71+
use_in_memory_redis: RedisSettings,
72+
rabbit_service: RabbitSettings,
7173
) -> web.Application:
7274
app = web.Application()
7375
app.add_routes(server_routes)
7476
# this adds enveloping and error middlewares
7577
append_rest_middlewares(app, api_version="")
7678
long_running_tasks.server.setup(
7779
app,
78-
redis_settings=redis_service,
80+
redis_settings=use_in_memory_redis,
7981
redis_namespace="test",
82+
rabbit_settings=rabbit_service,
83+
rabbit_namespace="test",
8084
router_prefix="/futures_with_task_context",
8185
task_request_context_decorator=task_context_decorator,
8286
)

services/web/server/src/simcore_service_webserver/long_running_tasks.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,18 @@
1010
)
1111
from servicelib.aiohttp.long_running_tasks.server import setup
1212
from servicelib.aiohttp.typing_extension import Handler
13+
from servicelib.long_running_tasks.models import RabbitNamespace
1314
from servicelib.long_running_tasks.task import RedisNamespace
1415

15-
from . import redis
16+
from . import rabbitmq, redis
1617
from ._meta import API_VTAG
1718
from .login.decorators import login_required
1819
from .models import AuthenticatedRequestContext
1920

2021
_logger = logging.getLogger(__name__)
2122

22-
_LONG_RUNNING_TASKS_NAMESPACE: Final[RedisNamespace] = "webserver-legacy"
23+
_LRT_REDIS_NAMESPACE: Final[RedisNamespace] = "webserver-legacy"
24+
_LRT_RABBIT_NAMESPACE: Final[RabbitNamespace] = "webserver-legacy"
2325

2426

2527
def webserver_request_context_decorator(handler: Handler):
@@ -37,10 +39,13 @@ async def _test_task_context_decorator(
3739

3840
@ensure_single_setup(__name__, logger=_logger)
3941
def setup_long_running_tasks(app: web.Application) -> None:
42+
4043
setup(
4144
app,
4245
redis_settings=redis.get_plugin_settings(app),
43-
redis_namespace=_LONG_RUNNING_TASKS_NAMESPACE,
46+
rabbit_settings=rabbitmq.get_plugin_settings(app),
47+
redis_namespace=_LRT_REDIS_NAMESPACE,
48+
rabbit_namespace=_LRT_RABBIT_NAMESPACE,
4449
router_prefix=f"/{API_VTAG}/tasks-legacy",
4550
handler_check_decorator=login_required,
4651
task_request_context_decorator=webserver_request_context_decorator,

0 commit comments

Comments
 (0)