Skip to content

Commit 7f24c65

Browse files
author
Andrei Neagu
committed
fixed fastapi tests
1 parent 08f82cc commit 7f24c65

File tree

8 files changed

+141
-16
lines changed

8 files changed

+141
-16
lines changed

packages/service-library/src/servicelib/fastapi/long_running_tasks/_manager.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
import datetime
22

33
from fastapi import FastAPI
4+
from settings_library.rabbit import RabbitSettings
45
from settings_library.redis import RedisSettings
56

7+
from ...long_running_tasks import lrt_api
68
from ...long_running_tasks.base_long_running_manager import BaseLongRunningManager
9+
from ...long_running_tasks.models import RabbitNamespace
710
from ...long_running_tasks.task import RedisNamespace, TasksManager
11+
from ...rabbitmq._client_rpc import RabbitMQRPCClient
812

913

1014
class FastAPILongRunningManager(BaseLongRunningManager):
@@ -14,7 +18,9 @@ def __init__(
1418
stale_task_check_interval: datetime.timedelta,
1519
stale_task_detect_timeout: datetime.timedelta,
1620
redis_settings: RedisSettings,
21+
rabbit_settings: RabbitSettings,
1722
redis_namespace: RedisNamespace,
23+
rabbit_namespace: RabbitNamespace,
1824
):
1925
self._app = app
2026
self._tasks_manager = TasksManager(
@@ -23,13 +29,48 @@ def __init__(
2329
redis_settings=redis_settings,
2430
redis_namespace=redis_namespace,
2531
)
32+
self._rabbit_namespace = rabbit_namespace
33+
self.rabbit_settings = rabbit_settings
34+
self._rpc_server: RabbitMQRPCClient | None = None
35+
self._rpc_client: RabbitMQRPCClient | None = None
2636

2737
@property
2838
def tasks_manager(self) -> TasksManager:
2939
return self._tasks_manager
3040

41+
@property
42+
def rpc_server(self) -> RabbitMQRPCClient:
43+
assert self._rpc_server is not None # nosec
44+
return self._rpc_server
45+
46+
@property
47+
def rpc_client(self) -> RabbitMQRPCClient:
48+
assert self._rpc_client is not None # nosec
49+
return self._rpc_client
50+
51+
@property
52+
def rabbit_namespace(self) -> str:
53+
return self._rabbit_namespace
54+
3155
async def setup(self) -> None:
3256
await self._tasks_manager.setup()
57+
self._rpc_server = await RabbitMQRPCClient.create(
58+
client_name=f"lrt-server-{self.rabbit_namespace}",
59+
settings=self.rabbit_settings,
60+
)
61+
self._rpc_client = await RabbitMQRPCClient.create(
62+
client_name=f"lrt-client-{self.rabbit_namespace}",
63+
settings=self.rabbit_settings,
64+
)
65+
await lrt_api.register_rabbit_routes(self)
3366

3467
async def teardown(self) -> None:
3568
await self._tasks_manager.teardown()
69+
70+
if self._rpc_server is not None:
71+
await self._rpc_server.close()
72+
self._rpc_server = None
73+
74+
if self._rpc_client is not None:
75+
await self._rpc_client.close()
76+
self._rpc_client = None

packages/service-library/src/servicelib/fastapi/long_running_tasks/_routes.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ async def list_tasks(
2727
result_href=str(request.url_for("get_task_result", task_id=t.task_id)),
2828
abort_href=str(request.url_for("remove_task", task_id=t.task_id)),
2929
)
30-
for t in await lrt_api.list_tasks(long_running_manager, task_context={})
30+
for t in await lrt_api.list_tasks(
31+
long_running_manager.rpc_client, long_running_manager, task_context={}
32+
)
3133
]
3234

3335

@@ -48,7 +50,10 @@ async def get_task_status(
4850
) -> TaskStatus:
4951
assert request # nosec
5052
return await lrt_api.get_task_status(
51-
long_running_manager, task_context={}, task_id=task_id
53+
long_running_manager.rpc_client,
54+
long_running_manager,
55+
task_context={},
56+
task_id=task_id,
5257
)
5358

5459

@@ -71,7 +76,10 @@ async def get_task_result(
7176
) -> TaskResult | Any:
7277
assert request # nosec
7378
return await lrt_api.get_task_result(
74-
long_running_manager, task_context={}, task_id=task_id
79+
long_running_manager.rpc_client,
80+
long_running_manager,
81+
task_context={},
82+
task_id=task_id,
7583
)
7684

7785

@@ -93,4 +101,9 @@ async def remove_task(
93101
],
94102
) -> None:
95103
assert request # nosec
96-
await lrt_api.remove_task(long_running_manager, task_context={}, task_id=task_id)
104+
await lrt_api.remove_task(
105+
long_running_manager.rpc_client,
106+
long_running_manager,
107+
task_context={},
108+
task_id=task_id,
109+
)

packages/service-library/src/servicelib/fastapi/long_running_tasks/_server.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
import datetime
22

33
from fastapi import APIRouter, FastAPI
4+
from settings_library.rabbit import RabbitSettings
45
from settings_library.redis import RedisSettings
56

67
from ...long_running_tasks.constants import (
78
DEFAULT_STALE_TASK_CHECK_INTERVAL,
89
DEFAULT_STALE_TASK_DETECT_TIMEOUT,
910
)
1011
from ...long_running_tasks.errors import BaseLongRunningError
12+
from ...long_running_tasks.models import RabbitNamespace
1113
from ...long_running_tasks.task import RedisNamespace
1214
from ._error_handlers import base_long_running_error_handler
1315
from ._manager import FastAPILongRunningManager
@@ -20,6 +22,8 @@ def setup(
2022
router_prefix: str = "",
2123
redis_settings: RedisSettings,
2224
redis_namespace: RedisNamespace,
25+
rabbit_settings: RabbitSettings,
26+
rabbit_namespace: RabbitNamespace,
2327
stale_task_check_interval: datetime.timedelta = DEFAULT_STALE_TASK_CHECK_INTERVAL,
2428
stale_task_detect_timeout: datetime.timedelta = DEFAULT_STALE_TASK_DETECT_TIMEOUT,
2529
) -> None:
@@ -47,6 +51,8 @@ async def on_startup() -> None:
4751
stale_task_detect_timeout=stale_task_detect_timeout,
4852
redis_settings=redis_settings,
4953
redis_namespace=redis_namespace,
54+
rabbit_settings=rabbit_settings,
55+
rabbit_namespace=rabbit_namespace,
5056
)
5157
)
5258
await long_running_manager.setup()

packages/service-library/src/servicelib/long_running_tasks/_rabbit/lrt_client.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import logging
2-
from typing import Any
2+
from datetime import timedelta
3+
from typing import Any, Final
34

45
from models_library.rabbitmq_basic_types import RPCMethodName
5-
from pydantic import TypeAdapter
6+
from pydantic import PositiveInt, TypeAdapter
67

78
from ...logging_utils import log_decorator
89
from ...long_running_tasks.task import RegisteredTaskName
@@ -12,6 +13,13 @@
1213

1314
_logger = logging.getLogger(__name__)
1415

16+
_RPC_TIMEOUT_VERY_LONG_REQUEST: Final[PositiveInt] = int(
17+
timedelta(minutes=60).total_seconds()
18+
)
19+
_RPC_TIMEOUT_NORMAL_REQUEST: Final[PositiveInt] = int(
20+
timedelta(seconds=30).total_seconds()
21+
)
22+
1523

1624
@log_decorator(_logger, level=logging.DEBUG)
1725
async def start_task(
@@ -34,6 +42,7 @@ async def start_task(
3442
task_name=task_name,
3543
fire_and_forget=fire_and_forget,
3644
**task_kwargs,
45+
timeout_s=_RPC_TIMEOUT_NORMAL_REQUEST,
3746
)
3847
assert isinstance(result, TaskId) # nosec
3948
return result
@@ -50,9 +59,9 @@ async def list_tasks(
5059
get_namespace(namespace),
5160
TypeAdapter(RPCMethodName).validate_python("list_tasks"),
5261
task_context=task_context,
62+
timeout_s=_RPC_TIMEOUT_NORMAL_REQUEST,
5363
)
54-
assert TypeAdapter(list[TaskBase]).validate_python(result) # nosec
55-
return result
64+
return TypeAdapter(list[TaskBase]).validate_python(result)
5665

5766

5867
@log_decorator(_logger, level=logging.DEBUG)
@@ -68,6 +77,7 @@ async def get_task_status(
6877
TypeAdapter(RPCMethodName).validate_python("get_task_status"),
6978
task_context=task_context,
7079
task_id=task_id,
80+
timeout_s=_RPC_TIMEOUT_NORMAL_REQUEST,
7181
)
7282
assert isinstance(result, TaskStatus) # nosec
7383
return result
@@ -86,6 +96,7 @@ async def get_task_result(
8696
TypeAdapter(RPCMethodName).validate_python("get_task_result"),
8797
task_context=task_context,
8898
task_id=task_id,
99+
timeout_s=_RPC_TIMEOUT_NORMAL_REQUEST,
89100
)
90101

91102

@@ -104,5 +115,6 @@ async def remove_task(
104115
task_context=task_context,
105116
task_id=task_id,
106117
reraise_errors=reraise_errors,
118+
timeout_s=_RPC_TIMEOUT_VERY_LONG_REQUEST,
107119
)
108120
assert result is None # nosec

packages/service-library/tests/fastapi/long_running_tasks/conftest.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,23 @@
99
from fastapi import FastAPI
1010
from httpx import ASGITransport, AsyncClient
1111
from servicelib.fastapi import long_running_tasks
12+
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
13+
from settings_library.rabbit import RabbitSettings
1214
from settings_library.redis import RedisSettings
1315

1416

1517
@pytest.fixture
16-
async def bg_task_app(router_prefix: str, redis_service: RedisSettings) -> FastAPI:
18+
async def bg_task_app(
19+
router_prefix: str, redis_service: RedisSettings, rabbit_service: RabbitSettings
20+
) -> FastAPI:
1721
app = FastAPI()
1822

1923
long_running_tasks.server.setup(
2024
app,
2125
redis_settings=redis_service,
2226
redis_namespace="test",
27+
rabbit_settings=rabbit_service,
28+
rabbit_namespace="test",
2329
router_prefix=router_prefix,
2430
)
2531
return app
@@ -33,3 +39,14 @@ async def async_client(bg_task_app: FastAPI) -> AsyncIterable[AsyncClient]:
3339
headers={"Content-Type": "application/json"},
3440
) as client:
3541
yield client
42+
43+
44+
@pytest.fixture
45+
async def rabbitmq_rpc_client(
46+
rabbit_service: RabbitSettings,
47+
) -> AsyncIterable[RabbitMQRPCClient]:
48+
client = await RabbitMQRPCClient.create(
49+
client_name="test-lrt-rpc-client", settings=rabbit_service
50+
)
51+
yield client
52+
await client.close()

packages/service-library/tests/fastapi/long_running_tasks/test_long_running_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
@pytest.mark.parametrize(
99
"error_class, error_args",
1010
[
11-
(HTTPError, dict(message="")),
11+
(HTTPError, {"message": ""}),
1212
],
1313
)
1414
async def test_retry_on_errors(

packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,18 @@
3333
TaskStatus,
3434
)
3535
from servicelib.long_running_tasks.task import TaskContext, TaskRegistry
36+
from settings_library.rabbit import RabbitSettings
3637
from settings_library.redis import RedisSettings
3738
from tenacity.asyncio import AsyncRetrying
3839
from tenacity.retry import retry_if_exception_type
3940
from tenacity.stop import stop_after_delay
4041
from tenacity.wait import wait_fixed
4142
from yarl import URL
4243

44+
pytest_simcore_core_services_selection = [
45+
"rabbit",
46+
]
47+
4348
ITEM_PUBLISH_SLEEP: Final[float] = 0.1
4449

4550

@@ -81,6 +86,7 @@ async def create_string_list_task(
8186
fail: bool = False,
8287
) -> TaskId:
8388
return await lrt_api.start_task(
89+
long_running_manager.rpc_client,
8490
long_running_manager,
8591
_string_list_task.__name__,
8692
num_strings=num_strings,
@@ -93,12 +99,20 @@ async def create_string_list_task(
9399

94100
@pytest.fixture
95101
async def app(
96-
server_routes: APIRouter, use_in_memory_redis: RedisSettings
102+
server_routes: APIRouter,
103+
use_in_memory_redis: RedisSettings,
104+
rabbit_service: RabbitSettings,
97105
) -> AsyncIterator[FastAPI]:
98106
# overrides fastapi/conftest.py:app
99107
app = FastAPI(title="test app")
100108
app.include_router(server_routes)
101-
setup_server(app, redis_settings=use_in_memory_redis, redis_namespace="test")
109+
setup_server(
110+
app,
111+
redis_settings=use_in_memory_redis,
112+
redis_namespace="test",
113+
rabbit_settings=rabbit_service,
114+
rabbit_namespace="test",
115+
)
102116
setup_client(app)
103117
async with LifespanManager(app, startup_timeout=30, shutdown_timeout=30):
104118
yield app
@@ -188,7 +202,15 @@ async def test_workflow(
188202
("generated item", 0.8),
189203
("finished", 1.0),
190204
]
191-
assert all(x in progress_updates for x in EXPECTED_MESSAGES)
205+
206+
async for attempt in AsyncRetrying(
207+
wait=wait_fixed(0.1),
208+
stop=stop_after_delay(10),
209+
reraise=True,
210+
retry=retry_if_exception_type(AssertionError),
211+
):
212+
with attempt:
213+
assert all(x in progress_updates for x in EXPECTED_MESSAGES)
192214
# now check the result
193215
result_url = app.url_path_for("get_task_result", task_id=task_id)
194216
result = await client.get(f"{result_url}")

packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks_context_manager.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,13 @@
2828
TaskProgress,
2929
)
3030
from servicelib.long_running_tasks.task import TaskRegistry
31+
from settings_library.rabbit import RabbitSettings
3132
from settings_library.redis import RedisSettings
3233

34+
pytest_simcore_core_services_selection = [
35+
"rabbit",
36+
]
37+
3338
TASK_SLEEP_INTERVAL: Final[PositiveFloat] = 0.1
3439

3540
# UTILS
@@ -71,7 +76,9 @@ async def create_task_user_defined_route(
7176
FastAPILongRunningManager, Depends(get_long_running_manager)
7277
],
7378
) -> TaskId:
74-
return await lrt_api.start_task(long_running_manager, a_test_task.__name__)
79+
return await lrt_api.start_task(
80+
long_running_manager.rpc_client, long_running_manager, a_test_task.__name__
81+
)
7582

7683
@router.get("/api/failing", status_code=status.HTTP_200_OK)
7784
async def create_task_which_fails(
@@ -80,15 +87,20 @@ async def create_task_which_fails(
8087
],
8188
) -> TaskId:
8289
return await lrt_api.start_task(
83-
long_running_manager, a_failing_test_task.__name__
90+
long_running_manager.rpc_client,
91+
long_running_manager,
92+
a_failing_test_task.__name__,
8493
)
8594

8695
return router
8796

8897

8998
@pytest.fixture
9099
async def bg_task_app(
91-
user_routes: APIRouter, router_prefix: str, use_in_memory_redis: RedisSettings
100+
user_routes: APIRouter,
101+
router_prefix: str,
102+
use_in_memory_redis: RedisSettings,
103+
rabbit_service: RabbitSettings,
92104
) -> AsyncIterable[FastAPI]:
93105
app = FastAPI()
94106

@@ -99,6 +111,8 @@ async def bg_task_app(
99111
router_prefix=router_prefix,
100112
redis_settings=use_in_memory_redis,
101113
redis_namespace="test",
114+
rabbit_settings=rabbit_service,
115+
rabbit_namespace="test",
102116
)
103117
setup_client(app, router_prefix=router_prefix)
104118

0 commit comments

Comments
 (0)