Skip to content

Commit 09f85c1

Browse files
GitHKAndrei Neagu
andauthored
✨adding way to start/track/resolve background tasks for FastAPI based services (ITISFoundation#3174)
* adds FastAPI based utilities to easily define long running tasks * adds utilities to easily query long running tasks and their result from an via the usage of an httpx client Co-authored-by: Andrei Neagu <[email protected]>
1 parent 2e85e01 commit 09f85c1

29 files changed

+1887
-25
lines changed

packages/service-library/requirements/_fastapi.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
async-asgi-testclient # replacement for fastapi.testclient.TestClient [see b) below]
1010
fastapi
1111
fastapi_contrib[jaegertracing]
12+
httpx
1213
uvicorn
1314

1415
# NOTE: What test client to use for fastapi-based apps?

packages/service-library/requirements/_fastapi.txt

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@
55
# pip-compile --output-file=requirements/_fastapi.txt --strip-extras requirements/_fastapi.in
66
#
77
anyio==3.6.1
8-
# via starlette
8+
# via
9+
# httpcore
10+
# starlette
911
async-asgi-testclient==1.4.11
1012
# via -r requirements/_fastapi.in
1113
certifi==2022.6.15
12-
# via requests
14+
# via
15+
# httpcore
16+
# httpx
17+
# requests
1318
charset-normalizer==2.0.12
1419
# via requests
1520
click==8.1.3
@@ -20,12 +25,21 @@ fastapi==0.76.0
2025
# fastapi-contrib
2126
fastapi-contrib==0.2.11
2227
# via -r requirements/_fastapi.in
23-
h11==0.13.0
24-
# via uvicorn
28+
h11==0.12.0
29+
# via
30+
# httpcore
31+
# uvicorn
32+
httpcore==0.15.0
33+
# via httpx
34+
httpx==0.23.0
35+
# via
36+
# -c requirements/./../../../requirements/constraints.txt
37+
# -r requirements/_fastapi.in
2538
idna==3.3
2639
# via
2740
# anyio
2841
# requests
42+
# rfc3986
2943
jaeger-client==4.8.0
3044
# via fastapi-contrib
3145
multidict==6.0.2
@@ -41,10 +55,15 @@ pydantic==1.9.0
4155
# fastapi
4256
requests==2.28.0
4357
# via async-asgi-testclient
58+
rfc3986==1.5.0
59+
# via httpx
4460
six==1.16.0
4561
# via thrift
4662
sniffio==1.2.0
47-
# via anyio
63+
# via
64+
# anyio
65+
# httpcore
66+
# httpx
4867
starlette==0.18.0
4968
# via fastapi
5069
threadloop==1.0.2

packages/service-library/requirements/_test.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
--constraint _fastapi.txt
1212

1313
# testing
14+
asgi_lifespan
1415
coverage
1516
coveralls
1617
faker

packages/service-library/requirements/_test.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ aiosignal==1.2.0
1313
# via
1414
# -c requirements/_aiohttp.txt
1515
# aiohttp
16+
asgi-lifespan==1.0.1
17+
# via -r requirements/_test.in
1618
astroid==2.11.5
1719
# via pylint
1820
async-timeout==4.0.2
@@ -203,6 +205,10 @@ six==1.16.0
203205
# paramiko
204206
# python-dateutil
205207
# websocket-client
208+
sniffio==1.2.0
209+
# via
210+
# -c requirements/_fastapi.txt
211+
# asgi-lifespan
206212
termcolor==1.1.0
207213
# via pytest-sugar
208214
texttable==1.6.4

packages/service-library/src/servicelib/aiohttp/dev_error_logger.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
2-
from aiohttp.web import Application, middleware, Request, HTTPError
3-
from servicelib.aiohttp.typing_extension import Handler, Middleware
41
import logging
52
import traceback
63

4+
from aiohttp.web import Application, HTTPError, Request, middleware
5+
from servicelib.aiohttp.typing_extension import Handler, Middleware
6+
77
logger = logging.getLogger(__name__)
88

99
_SEP = "|||"

packages/service-library/src/servicelib/docker_compose.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import yaml
22

3-
43
# Notes on below env var names:
54
# - SIMCORE_REGISTRY will be replaced by the url of the simcore docker registry
65
# deployed inside the platform
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from . import client, server
2+
3+
__all__: tuple[str, ...] = (
4+
"client",
5+
"server",
6+
)
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
from typing import Any, Optional
2+
3+
from fastapi import FastAPI, status
4+
from httpx import AsyncClient
5+
from pydantic import AnyHttpUrl, BaseModel, PositiveFloat, parse_obj_as
6+
7+
from ._errors import GenericClientError, TaskClientResultError
8+
from ._models import TaskId, TaskResult, TaskStatus
9+
10+
11+
class ClientConfiguration(BaseModel):
12+
router_prefix: str
13+
default_timeout: PositiveFloat
14+
15+
16+
class Client:
17+
"""
18+
This is a client that aims to simplify the requests to get the
19+
status, result and/or cancel of a long running task.
20+
"""
21+
22+
def __init__(self, app: FastAPI, async_client: AsyncClient, base_url: AnyHttpUrl):
23+
"""
24+
`app`: used byt the `Client` to recover the `ClientConfiguration`
25+
`async_client`: an AsyncClient instance used by `Client`
26+
`base_url`: base endpoint where the server is listening on
27+
"""
28+
self.app = app
29+
self._async_client = async_client
30+
self._base_url = base_url
31+
32+
@property
33+
def _client_configuration(self) -> ClientConfiguration:
34+
return self.app.state.long_running_client_configuration
35+
36+
def _get_url(self, path: str) -> AnyHttpUrl:
37+
return parse_obj_as(
38+
AnyHttpUrl,
39+
f"{self._base_url}{self._client_configuration.router_prefix}{path}",
40+
)
41+
42+
async def get_task_status(
43+
self, task_id: TaskId, *, timeout: Optional[PositiveFloat] = None
44+
) -> TaskStatus:
45+
timeout = timeout or self._client_configuration.default_timeout
46+
result = await self._async_client.get(
47+
self._get_url(f"/task/{task_id}"),
48+
timeout=timeout,
49+
)
50+
if result.status_code != status.HTTP_200_OK:
51+
raise GenericClientError(
52+
action="getting_status",
53+
task_id=task_id,
54+
status=result.status_code,
55+
body=result.text,
56+
)
57+
58+
return TaskStatus.parse_obj(result.json())
59+
60+
async def get_task_result(
61+
self, task_id: TaskId, *, timeout: Optional[PositiveFloat] = None
62+
) -> Optional[Any]:
63+
timeout = timeout or self._client_configuration.default_timeout
64+
result = await self._async_client.get(
65+
self._get_url(f"/task/{task_id}/result"),
66+
timeout=timeout,
67+
)
68+
if result.status_code != status.HTTP_200_OK:
69+
raise GenericClientError(
70+
action="getting_result",
71+
task_id=task_id,
72+
status=result.status_code,
73+
body=result.text,
74+
)
75+
76+
task_result = TaskResult.parse_obj(result.json())
77+
if task_result.error is not None:
78+
raise TaskClientResultError(message=task_result.error)
79+
return task_result.result
80+
81+
async def cancel_and_delete_task(
82+
self, task_id: TaskId, *, timeout: Optional[PositiveFloat] = None
83+
) -> bool:
84+
timeout = timeout or self._client_configuration.default_timeout
85+
result = await self._async_client.delete(
86+
self._get_url(f"/task/{task_id}"),
87+
timeout=timeout,
88+
)
89+
if result.status_code != status.HTTP_200_OK:
90+
raise GenericClientError(
91+
action="cancelling_and_removing_task",
92+
task_id=task_id,
93+
status=result.status_code,
94+
body=result.text,
95+
)
96+
return result.json()
97+
98+
99+
def setup(
100+
app: FastAPI,
101+
*,
102+
router_prefix: str = "",
103+
http_requests_timeout: PositiveFloat = 15,
104+
):
105+
"""
106+
- `router_prefix` by default it is assumed the server mounts the APIs on
107+
`/task/...` this will assume the APIs are as following
108+
`{router_prefix}/task/...`
109+
- `http_requests_timeout` short requests are used to interact with the
110+
server API, a low timeout is sufficient
111+
"""
112+
113+
async def on_startup() -> None:
114+
app.state.long_running_client_configuration = ClientConfiguration(
115+
router_prefix=router_prefix, default_timeout=http_requests_timeout
116+
)
117+
118+
app.add_event_handler("startup", on_startup)
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import asyncio
2+
from asyncio.log import logger
3+
from contextlib import asynccontextmanager
4+
from typing import Any, AsyncIterator, Awaitable, Callable, Optional
5+
6+
from pydantic import PositiveFloat
7+
8+
from ._client import Client
9+
from ._errors import TaskClientTimeoutError
10+
from ._models import TaskId, TaskStatus
11+
12+
13+
class _ProgressManager:
14+
"""
15+
Avoids sending duplicate progress updates.
16+
17+
When polling the status, the same progress messages can arrive in a row.
18+
This allows the client to filter out the flood of messages when it subscribes
19+
for progress updates.
20+
"""
21+
22+
def __init__(
23+
self, update_callback: Optional[Callable[[str, float], Awaitable[None]]]
24+
) -> None:
25+
self._callback = update_callback
26+
self._last_message: Optional[str] = None
27+
self._last_percent: Optional[float] = None
28+
29+
async def update(
30+
self, *, message: Optional[str] = None, percent: Optional[float] = None
31+
) -> None:
32+
if self._callback is None:
33+
return
34+
35+
has_changes = False
36+
37+
if message is not None and self._last_message != message:
38+
self._last_message = message
39+
has_changes = True
40+
if percent is not None and self._last_percent != percent:
41+
self._last_percent = percent
42+
has_changes = True
43+
44+
if has_changes:
45+
await self._callback(self._last_message, self._last_percent)
46+
47+
48+
@asynccontextmanager
49+
async def periodic_task_result(
50+
client: Client,
51+
task_id: TaskId,
52+
*,
53+
task_timeout: PositiveFloat,
54+
progress_callback: Optional[Callable[[str, float], Awaitable[None]]] = None,
55+
status_poll_interval: PositiveFloat = 5,
56+
) -> AsyncIterator[Optional[Any]]:
57+
"""
58+
A convenient wrapper around the Client. Polls for results and returns them
59+
once available.
60+
61+
Parameters:
62+
- `client`: an instance of `long_running_tasks.client.Client`
63+
- `task_timeout`: when this expires the task will be cancelled and
64+
removed form the server
65+
- `progress` optional: user defined awaitable with two positional arguments:
66+
* first argument `message`, type `str`
67+
* second argument `percent`, type `float` between [0.0, 1.0]
68+
- `status_poll_interval` optional: when waiting for a task to finish,
69+
how frequent should the server be queried
70+
71+
raises: `TaskClientResultError` if the task finished with an error instead of
72+
the expected result
73+
raises: `asyncio.TimeoutError` NOTE: the remote task will also be removed
74+
"""
75+
progress_manager = _ProgressManager(progress_callback)
76+
77+
async def _status_update() -> TaskStatus:
78+
task_status = await client.get_task_status(task_id)
79+
logger.info("Task status %s", task_status.json())
80+
await progress_manager.update(
81+
message=task_status.task_progress.message,
82+
percent=task_status.task_progress.percent,
83+
)
84+
return task_status
85+
86+
async def _wait_task_completion() -> None:
87+
task_status = await _status_update()
88+
while not task_status.done:
89+
await asyncio.sleep(status_poll_interval)
90+
task_status = await _status_update()
91+
92+
try:
93+
await asyncio.wait_for(_wait_task_completion(), timeout=task_timeout)
94+
95+
result: Optional[Any] = await client.get_task_result(task_id)
96+
yield result
97+
except asyncio.TimeoutError as e:
98+
task_removed = await client.cancel_and_delete_task(task_id)
99+
raise TaskClientTimeoutError(
100+
task_id=task_id,
101+
timeout=task_timeout,
102+
exception=e,
103+
task_removed=task_removed,
104+
) from e
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from fastapi import Request
2+
3+
from ._task import TaskManager
4+
5+
6+
def get_task_manager(request: Request) -> TaskManager:
7+
return request.app.state.long_running_task_manager

0 commit comments

Comments
 (0)