Skip to content

Commit c604d7c

Browse files
authored
✨ Aiohttp long running tasks client (ITISFoundation#3290)
1 parent 6f7a4e1 commit c604d7c

File tree

9 files changed

+357
-91
lines changed

9 files changed

+357
-91
lines changed

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
1+
import asyncio
12
import logging
23
from functools import wraps
3-
from typing import AsyncGenerator, Callable
4+
from typing import Any, AsyncGenerator, Callable
45

56
from aiohttp import web
67
from pydantic import PositiveFloat
7-
8-
from ...long_running_tasks._task import TasksManager
8+
from servicelib.json_serialization import json_dumps
9+
10+
from ...long_running_tasks._models import TaskGet
11+
from ...long_running_tasks._task import (
12+
TaskContext,
13+
TaskProtocol,
14+
TasksManager,
15+
start_task,
16+
)
917
from ..typing_extension import Handler
1018
from ._constants import (
1119
APP_LONG_RUNNING_TASKS_MANAGER_KEY,
1220
MINUTE,
1321
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
1422
)
23+
from ._dependencies import create_task_name_from_request, get_tasks_manager
1524
from ._error_handlers import base_long_running_error_handler
1625
from ._routes import routes
1726

@@ -31,6 +40,49 @@ async def _wrap(request: web.Request):
3140
return _wrap
3241

3342

43+
async def start_long_running_task(
44+
request: web.Request,
45+
task: TaskProtocol,
46+
*,
47+
task_context: TaskContext,
48+
**task_kwargs: Any,
49+
) -> web.Response:
50+
task_manager = get_tasks_manager(request.app)
51+
task_name = create_task_name_from_request(request)
52+
task_id = None
53+
try:
54+
task_id = start_task(
55+
task_manager,
56+
task,
57+
task_context=task_context,
58+
task_name=task_name,
59+
**task_kwargs,
60+
)
61+
status_url = request.app.router["get_task_status"].url_for(task_id=task_id)
62+
result_url = request.app.router["get_task_result"].url_for(task_id=task_id)
63+
abort_url = request.app.router["cancel_and_delete_task"].url_for(
64+
task_id=task_id
65+
)
66+
task_get = TaskGet(
67+
task_id=task_id,
68+
task_name=task_name,
69+
status_href=f"{status_url}",
70+
result_href=f"{result_url}",
71+
abort_href=f"{abort_url}",
72+
)
73+
return web.json_response(
74+
data={"data": task_get},
75+
status=web.HTTPAccepted.status_code,
76+
dumps=json_dumps,
77+
)
78+
except asyncio.CancelledError:
79+
# cancel the task, the client has disconnected
80+
if task_id:
81+
task_manager = get_tasks_manager(request.app)
82+
await task_manager.cancel_task(task_id, with_task_context=None)
83+
raise
84+
85+
3486
def setup(
3587
app: web.Application,
3688
*,
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from typing import Any, AsyncGenerator, Coroutine, Final, Optional
4+
5+
from aiohttp import ClientConnectionError, ClientSession, web
6+
from pydantic import Json
7+
from tenacity import TryAgain, retry
8+
from tenacity._asyncio import AsyncRetrying
9+
from tenacity.retry import retry_if_exception_type
10+
from tenacity.stop import stop_after_delay
11+
from tenacity.wait import wait_random_exponential
12+
from yarl import URL
13+
14+
from ..rest_responses import unwrap_envelope
15+
from .server import TaskGet, TaskId, TaskProgress, TaskStatus
16+
17+
RequestBody = Json
18+
19+
_MINUTE: Final[int] = 60
20+
_HOUR: Final[int] = 60 * _MINUTE
21+
_DEFAULT_POLL_INTERVAL_S: Final[float] = 1
22+
_DEFAULT_AIOHTTP_RETRY_POLICY = dict(
23+
retry=retry_if_exception_type(ClientConnectionError),
24+
wait=wait_random_exponential(max=20),
25+
stop=stop_after_delay(30),
26+
reraise=True,
27+
)
28+
29+
30+
@retry(**_DEFAULT_AIOHTTP_RETRY_POLICY)
31+
async def _start(
32+
session: ClientSession, url: URL, json: Optional[RequestBody]
33+
) -> TaskGet:
34+
async with session.post(url, json=json) as response:
35+
response.raise_for_status()
36+
data, error = unwrap_envelope(await response.json())
37+
assert not error # nosec
38+
assert data is not None # nosec
39+
task = TaskGet.parse_obj(data)
40+
return task
41+
42+
43+
@retry(**_DEFAULT_AIOHTTP_RETRY_POLICY)
44+
async def _wait_for_completion(
45+
session: ClientSession,
46+
task_id: TaskId,
47+
status_url: URL,
48+
wait_timeout_s: int,
49+
) -> AsyncGenerator[TaskProgress, None]:
50+
try:
51+
52+
async for attempt in AsyncRetrying(
53+
stop=stop_after_delay(wait_timeout_s),
54+
reraise=True,
55+
retry=retry_if_exception_type(TryAgain),
56+
):
57+
with attempt:
58+
async with session.get(status_url) as response:
59+
response.raise_for_status()
60+
data, error = unwrap_envelope(await response.json())
61+
assert not error # nosec
62+
assert data is not None # nosec
63+
task_status = TaskStatus.parse_obj(data)
64+
yield task_status.task_progress
65+
if not task_status.done:
66+
await asyncio.sleep(
67+
float(
68+
response.headers.get(
69+
"retry-after", _DEFAULT_POLL_INTERVAL_S
70+
)
71+
)
72+
)
73+
raise TryAgain(
74+
f"{task_id=}, {task_status.started=} has "
75+
f"status: '{task_status.task_progress.message}'"
76+
f" {task_status.task_progress.percent}%"
77+
)
78+
except TryAgain as exc:
79+
# this is a timeout
80+
raise asyncio.TimeoutError(
81+
f"Long running task {task_id}, calling to {status_url} timed-out after {wait_timeout_s} seconds"
82+
) from exc
83+
84+
85+
@retry(**_DEFAULT_AIOHTTP_RETRY_POLICY)
86+
async def _task_result(session: ClientSession, result_url: URL) -> Any:
87+
async with session.get(result_url) as response:
88+
response.raise_for_status()
89+
if response.status != web.HTTPNoContent.status_code:
90+
data, error = unwrap_envelope(await response.json())
91+
assert not error # nosec
92+
assert data # nosec
93+
return data
94+
95+
96+
@retry(**_DEFAULT_AIOHTTP_RETRY_POLICY)
97+
async def _abort_task(session: ClientSession, abort_url: URL) -> None:
98+
async with session.delete(abort_url) as response:
99+
response.raise_for_status()
100+
data, error = unwrap_envelope(await response.json())
101+
assert not error # nosec
102+
assert not data # nosec
103+
104+
105+
@dataclass(frozen=True)
106+
class LRTask:
107+
progress: TaskProgress
108+
_result: Optional[Coroutine[Any, Any, Any]] = None
109+
110+
def done(self) -> bool:
111+
return self._result is not None
112+
113+
async def result(self) -> Any:
114+
if not self._result:
115+
raise ValueError("No result ready!")
116+
return await self._result
117+
118+
119+
async def long_running_task_request(
120+
session: ClientSession,
121+
url: URL,
122+
json: Optional[RequestBody] = None,
123+
client_timeout: int = 1 * _HOUR,
124+
) -> AsyncGenerator[LRTask, None]:
125+
"""Will use the passed `ClientSession` to call an oSparc long
126+
running task `url` passing `json` as request body.
127+
NOTE: this follows the usual aiohttp client syntax, and will raise the same errors
128+
129+
Raises:
130+
[https://docs.aiohttp.org/en/stable/client_reference.html#hierarchy-of-exceptions]
131+
"""
132+
task = None
133+
try:
134+
task = await _start(session, url, json)
135+
last_progress = None
136+
async for task_progress in _wait_for_completion(
137+
session,
138+
task.task_id,
139+
url.with_path(task.status_href, encoded=True),
140+
client_timeout,
141+
):
142+
last_progress = task_progress
143+
yield LRTask(progress=task_progress)
144+
assert last_progress # nosec
145+
yield LRTask(
146+
progress=last_progress,
147+
_result=_task_result(
148+
session, url.with_path(task.result_href, encoded=True)
149+
),
150+
)
151+
152+
except (asyncio.CancelledError, asyncio.TimeoutError):
153+
if task:
154+
await _abort_task(session, url.with_path(task.abort_href, encoded=True))
155+
raise

packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,16 @@
1313
TaskProtocol,
1414
TasksManager,
1515
TaskStatus,
16-
start_task,
1716
)
1817
from ._dependencies import create_task_name_from_request, get_tasks_manager
1918
from ._routes import TaskGet
20-
from ._server import setup
19+
from ._server import setup, start_long_running_task
2120

2221
__all__: tuple[str, ...] = (
2322
"create_task_name_from_request",
2423
"get_tasks_manager",
2524
"setup",
26-
"start_task",
25+
"start_long_running_task",
2726
"TaskAlreadyRunningError",
2827
"TaskCancelledError",
2928
"TaskId",

packages/service-library/src/servicelib/long_running_tasks/_task.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,4 +422,5 @@ async def _progress_task(progress: TaskProgress, handler: TaskProtocol):
422422
task_progress=task_progress,
423423
task_context=task_context or {},
424424
)
425+
425426
return tracked_task.task_id

packages/service-library/tests/aiohttp/long_running_tasks/conftest.py

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
# pylint: disable=unused-variable
44

55
import asyncio
6-
import urllib.parse
76
from typing import Awaitable, Callable
87

98
import pytest
@@ -13,12 +12,8 @@
1312
from pydantic import BaseModel, parse_obj_as
1413
from pytest_simcore.helpers.utils_assert import assert_status
1514
from servicelib.aiohttp import long_running_tasks
16-
from servicelib.aiohttp.long_running_tasks.server import (
17-
TaskId,
18-
create_task_name_from_request,
19-
)
15+
from servicelib.aiohttp.long_running_tasks.server import TaskId
2016
from servicelib.aiohttp.requests_validation import parse_request_query_parameters_as
21-
from servicelib.json_serialization import json_dumps
2217
from servicelib.long_running_tasks._task import TaskContext
2318
from tenacity._asyncio import AsyncRetrying
2419
from tenacity.retry import retry_if_exception_type
@@ -68,26 +63,15 @@ class _LongTaskQueryParams(BaseModel):
6863
fail: bool = False
6964

7065
@routes.post("/long_running_task:start", name=long_running_task_entrypoint)
71-
async def generate_list_strings(request: web.Request):
72-
task_manager = long_running_tasks.server.get_tasks_manager(request.app)
66+
async def generate_list_strings(request: web.Request) -> web.Response:
7367
query_params = parse_request_query_parameters_as(_LongTaskQueryParams, request)
74-
assert task_manager, "task manager is not initiated!"
75-
task_name = create_task_name_from_request(request)
76-
task_id = long_running_tasks.server.start_task(
77-
task_manager,
68+
return await long_running_tasks.server.start_long_running_task(
69+
request,
7870
_string_list_task,
7971
num_strings=query_params.num_strings,
8072
sleep_time=query_params.sleep_time,
8173
fail=query_params.fail,
8274
task_context=task_context,
83-
task_name=task_name,
84-
)
85-
assert task_id
86-
assert task_id.startswith(urllib.parse.quote(task_name, safe=""))
87-
return web.json_response(
88-
data={"data": task_id},
89-
status=web.HTTPAccepted.status_code,
90-
dumps=json_dumps,
9175
)
9276

9377
return routes
@@ -108,8 +92,8 @@ async def _caller(client: TestClient, **query_kwargs) -> TaskId:
10892
data, error = await assert_status(resp, web.HTTPAccepted)
10993
assert data
11094
assert not error
111-
task_id = parse_obj_as(long_running_tasks.server.TaskId, data)
112-
return task_id
95+
task_get = parse_obj_as(long_running_tasks.server.TaskGet, data)
96+
return task_get.task_id
11397

11498
return _caller
11599

packages/service-library/tests/aiohttp/long_running_tasks/test_long_running_tasks.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
from aiohttp import web
1919
from aiohttp.test_utils import TestClient
2020
from pydantic import parse_obj_as
21-
22-
# TESTS
2321
from pytest_simcore.helpers.utils_assert import assert_status
2422
from servicelib.aiohttp import long_running_tasks
2523
from servicelib.aiohttp.long_running_tasks.server import TaskGet, TaskId

0 commit comments

Comments
 (0)