Skip to content

Commit 72ac74d

Browse files
authored
✨ Long running tasks: allow retrieving tasks by user (ITISFoundation#3268)
1 parent 38b6073 commit 72ac74d

File tree

28 files changed

+1415
-706
lines changed

28 files changed

+1415
-706
lines changed

api/specs/webserver/openapi-tasks.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,19 @@
11
paths:
2+
/tasks:
3+
get:
4+
operationId: list_tasks
5+
tags:
6+
- tasks
7+
responses:
8+
"200":
9+
description: Returns the list of active tasks (running and/or done)
10+
content:
11+
application/json:
12+
schema:
13+
type: array
14+
items:
15+
$ref: "./components/schemas/task.yaml#/TaskEnveloped"
16+
217
/tasks/{task_id}:
318
parameters:
419
- name: task_id

api/specs/webserver/openapi.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,8 @@ paths:
326326
$ref: "./openapi-clusters.yaml#/paths/director_v2_clusters_cluster_id_details"
327327

328328
# TASKS --------------------------------------------------------------------------
329-
# /tasks:
330-
# $ref: "./openapi-tasks.yaml#/paths/~1tasks"
329+
/tasks:
330+
$ref: "./openapi-tasks.yaml#/paths/~1tasks"
331331

332332
/tasks/{task_id}:
333333
$ref: "./openapi-tasks.yaml#/paths/~1tasks~1{task_id}"

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_constants.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,9 @@
33
from pydantic import PositiveFloat
44

55
MINUTE: Final[PositiveFloat] = 60
6-
APP_LONG_RUNNING_TASKS_MANAGER_KEY: Final[str] = f"{__name__ }.long_running_tasks"
6+
APP_LONG_RUNNING_TASKS_MANAGER_KEY: Final[
7+
str
8+
] = f"{__name__ }.long_running_tasks.tasks_manager"
9+
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY: Final[
10+
str
11+
] = f"{__name__}.long_running_tasks.context"
Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,21 @@
1+
from typing import Any
2+
13
from aiohttp import web
24

35
from ...long_running_tasks._task import TasksManager
4-
from ._constants import APP_LONG_RUNNING_TASKS_MANAGER_KEY
6+
from ._constants import (
7+
APP_LONG_RUNNING_TASKS_MANAGER_KEY,
8+
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
9+
)
510

611

712
def get_tasks_manager(app: web.Application) -> TasksManager:
813
return app[APP_LONG_RUNNING_TASKS_MANAGER_KEY]
14+
15+
16+
def get_task_context(request: web.Request) -> dict[str, Any]:
17+
return request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY]
18+
19+
20+
def create_task_name_from_request(request: web.Request) -> str:
21+
return f"{request.method} {request.rel_url}"

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_routes.py

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66

77
from ...json_serialization import json_dumps
88
from ...long_running_tasks._errors import TaskNotCompletedError, TaskNotFoundError
9-
from ...long_running_tasks._models import TaskId, TaskStatus
9+
from ...long_running_tasks._models import TaskGet, TaskId, TaskStatus
10+
from ...long_running_tasks._task import TrackedTask
1011
from ...mimetype_constants import MIMETYPE_APPLICATION_JSON
11-
from ._dependencies import get_tasks_manager
12+
from ._dependencies import get_task_context, get_tasks_manager
1213

1314
log = logging.getLogger(__name__)
1415
routes = web.RouteTableDef()
@@ -18,37 +19,73 @@ class _PathParam(BaseModel):
1819
task_id: TaskId
1920

2021

22+
@routes.get("", name="list_tasks")
23+
async def list_tasks(request: web.Request) -> web.Response:
24+
tasks_manager = get_tasks_manager(request.app)
25+
task_context = get_task_context(request)
26+
tracked_tasks: list[TrackedTask] = tasks_manager.list_tasks(
27+
with_task_context=task_context
28+
)
29+
30+
return web.json_response(
31+
{
32+
"data": [
33+
TaskGet(
34+
task_id=t.task_id,
35+
task_name=t.task_name,
36+
status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}",
37+
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
38+
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
39+
)
40+
for t in tracked_tasks
41+
]
42+
},
43+
dumps=json_dumps,
44+
)
45+
46+
2147
@routes.get("/{task_id}", name="get_task_status")
2248
async def get_task_status(request: web.Request) -> web.Response:
2349
path_params = parse_request_path_parameters_as(_PathParam, request)
2450
tasks_manager = get_tasks_manager(request.app)
51+
task_context = get_task_context(request)
2552

26-
task_status: TaskStatus = tasks_manager.get_task_status(task_id=path_params.task_id)
53+
task_status: TaskStatus = tasks_manager.get_task_status(
54+
task_id=path_params.task_id, with_task_context=task_context
55+
)
2756
return web.json_response({"data": task_status}, dumps=json_dumps)
2857

2958

3059
@routes.get("/{task_id}/result", name="get_task_result")
3160
async def get_task_result(request: web.Request) -> web.Response:
3261
path_params = parse_request_path_parameters_as(_PathParam, request)
3362
tasks_manager = get_tasks_manager(request.app)
63+
task_context = get_task_context(request)
3464

3565
# NOTE: this might raise an exception that will be catched by the _error_handlers
3666
try:
37-
task_result = tasks_manager.get_task_result(task_id=path_params.task_id)
67+
task_result = tasks_manager.get_task_result(
68+
task_id=path_params.task_id, with_task_context=task_context
69+
)
3870
# NOTE: this will fail if the task failed for some reason....
39-
await tasks_manager.remove_task(path_params.task_id, reraise_errors=False)
71+
await tasks_manager.remove_task(
72+
path_params.task_id, with_task_context=task_context, reraise_errors=False
73+
)
4074
return task_result
4175
except (TaskNotFoundError, TaskNotCompletedError):
4276
raise
4377
except Exception:
4478
# the task shall be removed in this case
45-
await tasks_manager.remove_task(path_params.task_id, reraise_errors=False)
79+
await tasks_manager.remove_task(
80+
path_params.task_id, with_task_context=task_context, reraise_errors=False
81+
)
4682
raise
4783

4884

4985
@routes.delete("/{task_id}", name="cancel_and_delete_task")
5086
async def cancel_and_delete_task(request: web.Request) -> web.Response:
5187
path_params = parse_request_path_parameters_as(_PathParam, request)
5288
tasks_manager = get_tasks_manager(request.app)
53-
await tasks_manager.remove_task(path_params.task_id)
89+
task_context = get_task_context(request)
90+
await tasks_manager.remove_task(path_params.task_id, with_task_context=task_context)
5491
raise web.HTTPNoContent(content_type=MIMETYPE_APPLICATION_JSON)

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,48 @@
11
import logging
2-
from typing import AsyncGenerator
2+
from functools import wraps
3+
from typing import AsyncGenerator, Callable
34

45
from aiohttp import web
56
from pydantic import PositiveFloat
67

78
from ...long_running_tasks._task import TasksManager
8-
from ._constants import APP_LONG_RUNNING_TASKS_MANAGER_KEY, MINUTE
9+
from ..typing_extension import Handler
10+
from ._constants import (
11+
APP_LONG_RUNNING_TASKS_MANAGER_KEY,
12+
MINUTE,
13+
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
14+
)
915
from ._error_handlers import base_long_running_error_handler
1016
from ._routes import routes
1117

1218
log = logging.getLogger(__name__)
1319

1420

21+
def no_ops_decorator(handler: Handler):
22+
return handler
23+
24+
25+
def no_task_context_decorator(handler: Handler):
26+
@wraps(handler)
27+
async def _wrap(request: web.Request):
28+
request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY] = {}
29+
return await handler(request)
30+
31+
return _wrap
32+
33+
1534
def setup(
1635
app: web.Application,
1736
*,
1837
router_prefix: str,
38+
handler_check_decorator: Callable = no_ops_decorator,
39+
task_request_context_decorator: Callable = no_task_context_decorator,
1940
stale_task_check_interval_s: PositiveFloat = 1 * MINUTE,
2041
stale_task_detect_timeout_s: PositiveFloat = 5 * MINUTE,
2142
) -> None:
2243
"""
23-
- `router_prefix` APIs are mounted on `/task/...`, this
24-
will change them to be mounted as `{router_prefix}/task/...`
44+
- `router_prefix` APIs are mounted on `/...`, this
45+
will change them to be mounted as `{router_prefix}/...`
2546
- `stale_task_check_interval_s` interval at which the
2647
TaskManager checks for tasks which are no longer being
2748
actively monitored by a client
@@ -35,10 +56,9 @@ async def on_startup(app: web.Application) -> AsyncGenerator[None, None]:
3556
app.router.add_route(
3657
route.method, # type: ignore
3758
f"{router_prefix}{route.path}", # type: ignore
38-
route.handler, # type: ignore
59+
handler_check_decorator(task_request_context_decorator(route.handler)), # type: ignore
3960
**route.kwargs, # type: ignore
4061
)
41-
# app.router.add_routes(routes)
4262

4363
# add components to state
4464
app[

packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,27 @@
1010
TaskCancelledError,
1111
TaskId,
1212
TaskProgress,
13+
TaskProtocol,
1314
TasksManager,
1415
TaskStatus,
1516
start_task,
1617
)
17-
from ._dependencies import get_tasks_manager
18+
from ._dependencies import create_task_name_from_request, get_tasks_manager
19+
from ._routes import TaskGet
1820
from ._server import setup
1921

2022
__all__: tuple[str, ...] = (
23+
"create_task_name_from_request",
2124
"get_tasks_manager",
2225
"setup",
2326
"start_task",
2427
"TaskAlreadyRunningError",
2528
"TaskCancelledError",
2629
"TaskId",
30+
"TaskGet",
2731
"TasksManager",
2832
"TaskProgress",
33+
"TaskProtocol",
2934
"TaskStatus",
3035
)
3136

packages/service-library/src/servicelib/fastapi/long_running_tasks/_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def _before_sleep_log(
3434
"""Before call strategy that logs to some logger the attempt."""
3535

3636
def log_it(retry_state: "RetryCallState") -> None:
37+
assert retry_state.outcome # nosec
3738
if retry_state.outcome.failed:
3839
ex = retry_state.outcome.exception()
3940
verb, value = "raised", f"{ex.__class__.__name__}: {ex}"
@@ -46,6 +47,7 @@ def log_it(retry_state: "RetryCallState") -> None:
4647
verb, value = "returned", retry_state.outcome.result()
4748
local_exc_info = False # exc_info does not apply when no exception
4849

50+
assert retry_state.next_action # nosec
4951
logger.warning(
5052
"Retrying '%s %s %s' in %s seconds as it %s %s. %s",
5153
request_function.__name__,

packages/service-library/src/servicelib/fastapi/long_running_tasks/_error_handlers.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
from starlette.requests import Request
44
from starlette.responses import JSONResponse
55

6-
from ...long_running_tasks._errors import BaseLongRunningError, TaskNotFoundError
6+
from ...long_running_tasks._errors import (
7+
BaseLongRunningError,
8+
TaskNotCompletedError,
9+
TaskNotFoundError,
10+
)
711

812

913
async def base_long_running_error_handler(
@@ -12,7 +16,7 @@ async def base_long_running_error_handler(
1216
error_fields = dict(code=exception.code, message=f"{exception}")
1317
status_code = (
1418
status.HTTP_404_NOT_FOUND
15-
if isinstance(exception, TaskNotFoundError)
19+
if isinstance(exception, (TaskNotFoundError, TaskNotCompletedError))
1620
else status.HTTP_400_BAD_REQUEST
1721
)
1822
return JSONResponse(content=jsonable_encoder(error_fields), status_code=status_code)

packages/service-library/src/servicelib/fastapi/long_running_tasks/_routes.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,32 @@
11
from fastapi import APIRouter, Depends, Request, status
22

33
from ...long_running_tasks._errors import TaskNotCompletedError, TaskNotFoundError
4-
from ...long_running_tasks._models import TaskId, TaskResult, TaskStatus
4+
from ...long_running_tasks._models import TaskGet, TaskId, TaskResult, TaskStatus
55
from ...long_running_tasks._task import TasksManager
66
from ..requests_decorators import cancel_on_disconnect
77
from ._dependencies import get_tasks_manager
88

99
router = APIRouter(prefix="/task")
1010

1111

12+
@router.get("", response_model=list[TaskGet])
13+
@cancel_on_disconnect
14+
async def list_tasks(
15+
request: Request, tasks_manager: TasksManager = Depends(get_tasks_manager)
16+
) -> list[TaskGet]:
17+
assert request # nosec
18+
return [
19+
TaskGet(
20+
task_id=t.task_id,
21+
task_name=t.task_name,
22+
status_href="",
23+
result_href="",
24+
abort_href="",
25+
)
26+
for t in tasks_manager.list_tasks(with_task_context=None)
27+
]
28+
29+
1230
@router.get(
1331
"/{task_id}",
1432
responses={
@@ -22,7 +40,7 @@ async def get_task_status(
2240
tasks_manager: TasksManager = Depends(get_tasks_manager),
2341
) -> TaskStatus:
2442
assert request # nosec
25-
return tasks_manager.get_task_status(task_id=task_id)
43+
return tasks_manager.get_task_status(task_id=task_id, with_task_context=None)
2644

2745

2846
@router.get(
@@ -44,13 +62,17 @@ async def get_task_result(
4462
# TODO: refactor this to use same as in https://github.com/ITISFoundation/osparc-simcore/issues/3265
4563
try:
4664
task_result = tasks_manager.get_task_result_old(task_id=task_id)
47-
await tasks_manager.remove_task(task_id, reraise_errors=False)
65+
await tasks_manager.remove_task(
66+
task_id, with_task_context=None, reraise_errors=False
67+
)
4868
return task_result
4969
except (TaskNotFoundError, TaskNotCompletedError):
5070
raise
5171
except Exception:
5272
# the task shall be removed in this case
53-
await tasks_manager.remove_task(task_id, reraise_errors=False)
73+
await tasks_manager.remove_task(
74+
task_id, with_task_context=None, reraise_errors=False
75+
)
5476
raise
5577

5678

@@ -70,4 +92,4 @@ async def cancel_and_delete_task(
7092
tasks_manager: TasksManager = Depends(get_tasks_manager),
7193
) -> None:
7294
assert request # nosec
73-
await tasks_manager.remove_task(task_id)
95+
await tasks_manager.remove_task(task_id, with_task_context=None)

0 commit comments

Comments
 (0)