Skip to content

Commit f4de3a8

Browse files
authored
Merge branch 'master' into enh/handle-non-existing-services
2 parents cd5f7c2 + c7b7878 commit f4de3a8

File tree

38 files changed

+514
-511
lines changed

38 files changed

+514
-511
lines changed

api/specs/web-server/_long_running_tasks.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ def cancel_async_job(
6565

6666
@router.get(
6767
"/tasks/{task_id}/result",
68+
response_model=Any,
6869
name="get_task_result",
6970
description="Retrieves the result of a task",
7071
responses=_export_data_responses,

api/specs/web-server/_long_running_tasks_legacy.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# pylint: disable=too-many-arguments
55

66

7-
from typing import Annotated
7+
from typing import Annotated, Any
88

99
from fastapi import APIRouter, Depends, status
1010
from models_library.generics import Envelope
@@ -54,6 +54,7 @@ def cancel_and_delete_task(
5454
@router.get(
5555
"/{task_id}/result",
5656
name="get_task_result",
57+
response_model=Any,
5758
description="Retrieves the result of a task",
5859
)
5960
def get_task_result(

packages/models-library/src/models_library/api_schemas_long_running_tasks/tasks.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ class TaskResult(BaseModel):
1818
error: Any | None
1919

2020

21-
class TaskGet(BaseModel):
21+
class TaskBase(BaseModel):
2222
task_id: TaskId
2323
task_name: str
24+
25+
26+
class TaskGet(TaskBase):
2427
status_href: str
2528
result_href: str
2629
abort_href: str

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_constants.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
from pydantic import PositiveFloat
44

55
MINUTE: Final[PositiveFloat] = 60
6-
APP_LONG_RUNNING_TASKS_MANAGER_KEY: Final[
7-
str
8-
] = f"{__name__ }.long_running_tasks.tasks_manager"
9-
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY: Final[
10-
str
11-
] = f"{__name__}.long_running_tasks.context"
6+
APP_LONG_RUNNING_MANAGER_KEY: Final[str] = (
7+
f"{__name__ }.long_running_tasks.tasks_manager"
8+
)
9+
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY: Final[str] = (
10+
f"{__name__}.long_running_tasks.context"
11+
)

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_dependencies.py

Lines changed: 0 additions & 23 deletions
This file was deleted.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import datetime
2+
3+
from aiohttp import web
4+
5+
from ...long_running_tasks.base_long_running_manager import BaseLongRunningManager
6+
from ...long_running_tasks.task import TaskContext, TasksManager
7+
from ._constants import APP_LONG_RUNNING_MANAGER_KEY
8+
from ._request import get_task_context
9+
10+
11+
class AiohttpLongRunningManager(BaseLongRunningManager):
12+
def __init__(
13+
self,
14+
app: web.Application,
15+
stale_task_check_interval: datetime.timedelta,
16+
stale_task_detect_timeout: datetime.timedelta,
17+
):
18+
self._app = app
19+
self._tasks_manager = TasksManager(
20+
stale_task_check_interval=stale_task_check_interval,
21+
stale_task_detect_timeout=stale_task_detect_timeout,
22+
)
23+
24+
@property
25+
def tasks_manager(self) -> TasksManager:
26+
return self._tasks_manager
27+
28+
async def setup(self) -> None:
29+
await self._tasks_manager.setup()
30+
31+
async def teardown(self) -> None:
32+
await self._tasks_manager.teardown()
33+
34+
@staticmethod
35+
def get_task_context(request: web.Request) -> TaskContext:
36+
return get_task_context(request)
37+
38+
39+
def get_long_running_manager(app: web.Application) -> AiohttpLongRunningManager:
40+
output: AiohttpLongRunningManager = app[APP_LONG_RUNNING_MANAGER_KEY]
41+
return output
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from typing import Any
2+
3+
from aiohttp import web
4+
5+
from ._constants import RQT_LONG_RUNNING_TASKS_CONTEXT_KEY
6+
7+
8+
def get_task_context(request: web.Request) -> dict[str, Any]:
9+
output: dict[str, Any] = request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY]
10+
return output
Lines changed: 37 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
1-
import logging
21
from typing import Any
32

43
from aiohttp import web
5-
from common_library.json_serialization import json_dumps
64
from pydantic import BaseModel
75
from servicelib.aiohttp import status
86

9-
from ...long_running_tasks.errors import TaskNotCompletedError, TaskNotFoundError
7+
from ...long_running_tasks import http_endpoint_responses
108
from ...long_running_tasks.models import TaskGet, TaskId, TaskStatus
11-
from ...long_running_tasks.task import TrackedTask
129
from ..requests_validation import parse_request_path_parameters_as
13-
from ._dependencies import get_task_context, get_tasks_manager
10+
from ..rest_responses import create_data_response
11+
from ._manager import get_long_running_manager
1412

15-
_logger = logging.getLogger(__name__)
1613
routes = web.RouteTableDef()
1714

1815

@@ -22,79 +19,58 @@ class _PathParam(BaseModel):
2219

2320
@routes.get("", name="list_tasks")
2421
async def list_tasks(request: web.Request) -> web.Response:
25-
tasks_manager = get_tasks_manager(request.app)
26-
task_context = get_task_context(request)
27-
tracked_tasks: list[TrackedTask] = tasks_manager.list_tasks(
28-
with_task_context=task_context
29-
)
30-
31-
return web.json_response(
32-
{
33-
"data": [
34-
TaskGet(
35-
task_id=t.task_id,
36-
task_name=t.task_name,
37-
status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}",
38-
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
39-
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
40-
)
41-
for t in tracked_tasks
42-
]
43-
},
44-
dumps=json_dumps,
22+
long_running_manager = get_long_running_manager(request.app)
23+
return create_data_response(
24+
[
25+
TaskGet(
26+
task_id=t.task_id,
27+
task_name=t.task_name,
28+
status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}",
29+
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
30+
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
31+
)
32+
for t in http_endpoint_responses.list_tasks(
33+
long_running_manager.tasks_manager,
34+
long_running_manager.get_task_context(request),
35+
)
36+
]
4537
)
4638

4739

4840
@routes.get("/{task_id}", name="get_task_status")
4941
async def get_task_status(request: web.Request) -> web.Response:
5042
path_params = parse_request_path_parameters_as(_PathParam, request)
51-
tasks_manager = get_tasks_manager(request.app)
52-
task_context = get_task_context(request)
43+
long_running_manager = get_long_running_manager(request.app)
5344

54-
task_status: TaskStatus = tasks_manager.get_task_status(
55-
task_id=path_params.task_id, with_task_context=task_context
45+
task_status: TaskStatus = http_endpoint_responses.get_task_status(
46+
long_running_manager.tasks_manager,
47+
long_running_manager.get_task_context(request),
48+
path_params.task_id,
5649
)
57-
return web.json_response({"data": task_status}, dumps=json_dumps)
50+
return create_data_response(task_status)
5851

5952

6053
@routes.get("/{task_id}/result", name="get_task_result")
6154
async def get_task_result(request: web.Request) -> web.Response | Any:
6255
path_params = parse_request_path_parameters_as(_PathParam, request)
63-
tasks_manager = get_tasks_manager(request.app)
64-
task_context = get_task_context(request)
56+
long_running_manager = get_long_running_manager(request.app)
6557

6658
# NOTE: this might raise an exception that will be catched by the _error_handlers
67-
try:
68-
task_result = tasks_manager.get_task_result(
69-
task_id=path_params.task_id, with_task_context=task_context
70-
)
71-
# NOTE: this will fail if the task failed for some reason....
72-
await tasks_manager.remove_task(
73-
path_params.task_id, with_task_context=task_context, reraise_errors=False
74-
)
75-
return task_result
76-
except (TaskNotFoundError, TaskNotCompletedError):
77-
raise
78-
except Exception:
79-
# the task shall be removed in this case
80-
await tasks_manager.remove_task(
81-
path_params.task_id, with_task_context=task_context, reraise_errors=False
82-
)
83-
raise
59+
return await http_endpoint_responses.get_task_result(
60+
long_running_manager.tasks_manager,
61+
long_running_manager.get_task_context(request),
62+
path_params.task_id,
63+
)
8464

8565

8666
@routes.delete("/{task_id}", name="cancel_and_delete_task")
8767
async def cancel_and_delete_task(request: web.Request) -> web.Response:
8868
path_params = parse_request_path_parameters_as(_PathParam, request)
89-
tasks_manager = get_tasks_manager(request.app)
90-
task_context = get_task_context(request)
91-
await tasks_manager.remove_task(path_params.task_id, with_task_context=task_context)
92-
return web.json_response(status=status.HTTP_204_NO_CONTENT)
69+
long_running_manager = get_long_running_manager(request.app)
9370

94-
95-
__all__: tuple[str, ...] = (
96-
"get_tasks_manager",
97-
"TaskId",
98-
"TaskGet",
99-
"TaskStatus",
100-
)
71+
await http_endpoint_responses.remove_task(
72+
long_running_manager.tasks_manager,
73+
long_running_manager.get_task_context(request),
74+
path_params.task_id,
75+
)
76+
return web.json_response(status=status.HTTP_204_NO_CONTENT)

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,28 @@
11
import asyncio
2-
import logging
2+
import datetime
33
from collections.abc import AsyncGenerator, Callable
44
from functools import wraps
55
from typing import Any
66

77
from aiohttp import web
88
from common_library.json_serialization import json_dumps
9-
from pydantic import AnyHttpUrl, PositiveFloat, TypeAdapter
9+
from pydantic import AnyHttpUrl, TypeAdapter
1010

1111
from ...aiohttp import status
12-
from ...long_running_tasks.models import TaskGet
13-
from ...long_running_tasks.task import (
14-
TaskContext,
15-
TaskProtocol,
16-
TasksManager,
17-
start_task,
12+
from ...long_running_tasks.constants import (
13+
DEFAULT_STALE_TASK_CHECK_INTERVAL,
14+
DEFAULT_STALE_TASK_DETECT_TIMEOUT,
1815
)
16+
from ...long_running_tasks.models import TaskGet
17+
from ...long_running_tasks.task import TaskContext, TaskProtocol, start_task
1918
from ..typing_extension import Handler
2019
from . import _routes
2120
from ._constants import (
22-
APP_LONG_RUNNING_TASKS_MANAGER_KEY,
23-
MINUTE,
21+
APP_LONG_RUNNING_MANAGER_KEY,
2422
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
2523
)
26-
from ._dependencies import create_task_name_from_request, get_tasks_manager
2724
from ._error_handlers import base_long_running_error_handler
28-
29-
_logger = logging.getLogger(__name__)
25+
from ._manager import AiohttpLongRunningManager, get_long_running_manager
3026

3127

3228
def no_ops_decorator(handler: Handler):
@@ -42,6 +38,10 @@ async def _wrap(request: web.Request):
4238
return _wrap
4339

4440

41+
def _create_task_name_from_request(request: web.Request) -> str:
42+
return f"{request.method} {request.rel_url}"
43+
44+
4545
async def start_long_running_task(
4646
# NOTE: positional argument are suffixed with "_" to avoid name conflicts with "task_kwargs" keys
4747
request_: web.Request,
@@ -51,12 +51,12 @@ async def start_long_running_task(
5151
task_context: TaskContext,
5252
**task_kwargs: Any,
5353
) -> web.Response:
54-
task_manager = get_tasks_manager(request_.app)
55-
task_name = create_task_name_from_request(request_)
54+
long_running_manager = get_long_running_manager(request_.app)
55+
task_name = _create_task_name_from_request(request_)
5656
task_id = None
5757
try:
5858
task_id = start_task(
59-
task_manager,
59+
long_running_manager.tasks_manager,
6060
task_,
6161
fire_and_forget=fire_and_forget,
6262
task_context=task_context,
@@ -91,8 +91,10 @@ async def start_long_running_task(
9191
except asyncio.CancelledError:
9292
# cancel the task, the client has disconnected
9393
if task_id:
94-
task_manager = get_tasks_manager(request_.app)
95-
await task_manager.cancel_task(task_id, with_task_context=None)
94+
long_running_manager = get_long_running_manager(request_.app)
95+
await long_running_manager.tasks_manager.cancel_task(
96+
task_id, with_task_context=None
97+
)
9698
raise
9799

98100

@@ -121,8 +123,8 @@ def setup(
121123
router_prefix: str,
122124
handler_check_decorator: Callable = no_ops_decorator,
123125
task_request_context_decorator: Callable = no_task_context_decorator,
124-
stale_task_check_interval_s: PositiveFloat = 1 * MINUTE,
125-
stale_task_detect_timeout_s: PositiveFloat = 5 * MINUTE,
126+
stale_task_check_interval: datetime.timedelta = DEFAULT_STALE_TASK_CHECK_INTERVAL,
127+
stale_task_detect_timeout: datetime.timedelta = DEFAULT_STALE_TASK_DETECT_TIMEOUT,
126128
) -> None:
127129
"""
128130
- `router_prefix` APIs are mounted on `/...`, this
@@ -135,21 +137,24 @@ def setup(
135137
"""
136138

137139
async def on_cleanup_ctx(app: web.Application) -> AsyncGenerator[None, None]:
140+
# add error handlers
141+
app.middlewares.append(base_long_running_error_handler)
142+
138143
# add components to state
139-
app[APP_LONG_RUNNING_TASKS_MANAGER_KEY] = long_running_task_manager = (
140-
TasksManager(
141-
stale_task_check_interval_s=stale_task_check_interval_s,
142-
stale_task_detect_timeout_s=stale_task_detect_timeout_s,
144+
app[APP_LONG_RUNNING_MANAGER_KEY] = long_running_manager = (
145+
AiohttpLongRunningManager(
146+
app=app,
147+
stale_task_check_interval=stale_task_check_interval,
148+
stale_task_detect_timeout=stale_task_detect_timeout,
143149
)
144150
)
145151

146-
# add error handlers
147-
app.middlewares.append(base_long_running_error_handler)
152+
await long_running_manager.setup()
148153

149154
yield
150155

151156
# cleanup
152-
await long_running_task_manager.close()
157+
await long_running_manager.teardown()
153158

154159
# add routing (done at setup-time)
155160
_wrap_and_add_routes(

0 commit comments

Comments
 (0)