Skip to content

Commit d0d8f85

Browse files
author
Andrei Neagu
committed
using long_running_manager for aiohttp
1 parent cb5064b commit d0d8f85

File tree

10 files changed

+99
-70
lines changed

10 files changed

+99
-70
lines changed

packages/models-library/src/models_library/api_schemas_long_running_tasks/tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ class TaskResult(BaseModel):
1818
error: Any | None
1919

2020

21-
class TaskGetWithoutHref(BaseModel):
21+
class TaskBase(BaseModel):
2222
task_id: TaskId
2323
task_name: str
2424

2525

26-
class TaskGet(TaskGetWithoutHref):
26+
class TaskGet(TaskBase):
2727
status_href: str
2828
result_href: str
2929
abort_href: str

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_constants.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
from pydantic import PositiveFloat
44

55
MINUTE: Final[PositiveFloat] = 60
6-
APP_LONG_RUNNING_TASKS_MANAGER_KEY: Final[
7-
str
8-
] = f"{__name__ }.long_running_tasks.tasks_manager"
9-
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY: Final[
10-
str
11-
] = f"{__name__}.long_running_tasks.context"
6+
APP_LONG_RUNNING_MANAGER_KEY: Final[str] = (
7+
f"{__name__ }.long_running_tasks.tasks_manager"
8+
)
9+
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY: Final[str] = (
10+
f"{__name__}.long_running_tasks.context"
11+
)
Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
from aiohttp import web
1+
from typing import Any
22

3-
from ...long_running_tasks.task import TasksManager
4-
from ._constants import APP_LONG_RUNNING_TASKS_MANAGER_KEY
3+
from aiohttp import web
54

6-
# NOTE: figure out how to remove these and expose them differently if possible
5+
from ._constants import RQT_LONG_RUNNING_TASKS_CONTEXT_KEY
76

87

9-
def get_tasks_manager(app: web.Application) -> TasksManager:
10-
output: TasksManager = app[APP_LONG_RUNNING_TASKS_MANAGER_KEY]
8+
def get_task_context(request: web.Request) -> dict[str, Any]:
9+
output: dict[str, Any] = request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY]
1110
return output
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import datetime
2+
3+
from aiohttp import web
4+
5+
from ...long_running_tasks.base_long_running_manager import BaseLongRunningManager
6+
from ...long_running_tasks.task import TaskContext, TasksManager
7+
from ._constants import APP_LONG_RUNNING_MANAGER_KEY
8+
from ._dependencies import get_task_context
9+
10+
11+
class AiohttpLongRunningManager(BaseLongRunningManager):
12+
def __init__(
13+
self,
14+
app: web.Application,
15+
stale_task_check_interval: datetime.timedelta,
16+
stale_task_detect_timeout: datetime.timedelta,
17+
):
18+
self._app = app
19+
self._tasks_manager = TasksManager(
20+
stale_task_check_interval=stale_task_check_interval,
21+
stale_task_detect_timeout=stale_task_detect_timeout,
22+
)
23+
24+
@property
25+
def tasks_manager(self) -> TasksManager:
26+
return self._tasks_manager
27+
28+
async def setup(self) -> None:
29+
await self._tasks_manager.setup()
30+
31+
async def teardown(self) -> None:
32+
await self._tasks_manager.teardown()
33+
34+
@staticmethod
35+
def get_task_context(request: web.Request) -> TaskContext:
36+
return get_task_context(request)
37+
38+
39+
def get_long_running_manager(app: web.Application) -> AiohttpLongRunningManager:
40+
output: AiohttpLongRunningManager = app[APP_LONG_RUNNING_MANAGER_KEY]
41+
return output

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_routes.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,23 @@
33
from aiohttp import web
44
from pydantic import BaseModel
55
from servicelib.aiohttp import status
6-
from servicelib.aiohttp.rest_responses import create_data_response
76

87
from ...long_running_tasks import http_endpoint_responses
98
from ...long_running_tasks.models import TaskGet, TaskId, TaskStatus
109
from ..requests_validation import parse_request_path_parameters_as
11-
from ._constants import RQT_LONG_RUNNING_TASKS_CONTEXT_KEY
12-
from ._dependencies import get_tasks_manager
10+
from ..rest_responses import create_data_response
11+
from ._manager import get_long_running_manager
1312

1413
routes = web.RouteTableDef()
1514

1615

17-
def get_task_context(request: web.Request) -> dict[str, Any]:
18-
output: dict[str, Any] = request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY]
19-
return output
20-
21-
2216
class _PathParam(BaseModel):
2317
task_id: TaskId
2418

2519

2620
@routes.get("", name="list_tasks")
2721
async def list_tasks(request: web.Request) -> web.Response:
28-
tasks_manager = get_tasks_manager(request.app)
29-
task_context = get_task_context(request)
22+
long_running_manager = get_long_running_manager(request.app)
3023
return create_data_response(
3124
[
3225
TaskGet(
@@ -36,41 +29,48 @@ async def list_tasks(request: web.Request) -> web.Response:
3629
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
3730
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
3831
)
39-
for t in http_endpoint_responses.list_tasks(tasks_manager, task_context)
32+
for t in http_endpoint_responses.list_tasks(
33+
long_running_manager.tasks_manager,
34+
long_running_manager.get_task_context(request),
35+
)
4036
]
4137
)
4238

4339

4440
@routes.get("/{task_id}", name="get_task_status")
4541
async def get_task_status(request: web.Request) -> web.Response:
4642
path_params = parse_request_path_parameters_as(_PathParam, request)
47-
tasks_manager = get_tasks_manager(request.app)
48-
task_context = get_task_context(request)
43+
long_running_manager = get_long_running_manager(request.app)
4944

5045
task_status: TaskStatus = http_endpoint_responses.get_task_status(
51-
tasks_manager, task_context, path_params.task_id
46+
long_running_manager.tasks_manager,
47+
long_running_manager.get_task_context(request),
48+
path_params.task_id,
5249
)
5350
return create_data_response(task_status)
5451

5552

5653
@routes.get("/{task_id}/result", name="get_task_result")
5754
async def get_task_result(request: web.Request) -> web.Response | Any:
5855
path_params = parse_request_path_parameters_as(_PathParam, request)
59-
tasks_manager = get_tasks_manager(request.app)
60-
task_context = get_task_context(request)
56+
long_running_manager = get_long_running_manager(request.app)
6157

6258
# NOTE: this might raise an exception that will be catched by the _error_handlers
6359
return await http_endpoint_responses.get_task_result(
64-
tasks_manager, task_context, path_params.task_id
60+
long_running_manager.tasks_manager,
61+
long_running_manager.get_task_context(request),
62+
path_params.task_id,
6563
)
6664

6765

6866
@routes.delete("/{task_id}", name="cancel_and_delete_task")
6967
async def cancel_and_delete_task(request: web.Request) -> web.Response:
7068
path_params = parse_request_path_parameters_as(_PathParam, request)
71-
tasks_manager = get_tasks_manager(request.app)
72-
task_context = get_task_context(request)
69+
long_running_manager = get_long_running_manager(request.app)
70+
7371
await http_endpoint_responses.remove_task(
74-
tasks_manager, task_context, path_params.task_id
72+
long_running_manager.tasks_manager,
73+
long_running_manager.get_task_context(request),
74+
path_params.task_id,
7575
)
7676
return web.json_response(status=status.HTTP_204_NO_CONTENT)

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import asyncio
22
import datetime
3-
import logging
43
from collections.abc import AsyncGenerator, Callable
54
from functools import wraps
65
from typing import Any
@@ -15,22 +14,15 @@
1514
DEFAULT_STALE_TASK_DETECT_TIMEOUT,
1615
)
1716
from ...long_running_tasks.models import TaskGet
18-
from ...long_running_tasks.task import (
19-
TaskContext,
20-
TaskProtocol,
21-
TasksManager,
22-
start_task,
23-
)
17+
from ...long_running_tasks.task import TaskContext, TaskProtocol, start_task
2418
from ..typing_extension import Handler
2519
from . import _routes
2620
from ._constants import (
27-
APP_LONG_RUNNING_TASKS_MANAGER_KEY,
21+
APP_LONG_RUNNING_MANAGER_KEY,
2822
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
2923
)
30-
from ._dependencies import get_tasks_manager
3124
from ._error_handlers import base_long_running_error_handler
32-
33-
_logger = logging.getLogger(__name__)
25+
from ._manager import AiohttpLongRunningManager, get_long_running_manager
3426

3527

3628
def no_ops_decorator(handler: Handler):
@@ -59,12 +51,12 @@ async def start_long_running_task(
5951
task_context: TaskContext,
6052
**task_kwargs: Any,
6153
) -> web.Response:
62-
task_manager = get_tasks_manager(request_.app)
54+
long_running_manager = get_long_running_manager(request_.app)
6355
task_name = _create_task_name_from_request(request_)
6456
task_id = None
6557
try:
6658
task_id = start_task(
67-
task_manager,
59+
long_running_manager.tasks_manager,
6860
task_,
6961
fire_and_forget=fire_and_forget,
7062
task_context=task_context,
@@ -99,8 +91,10 @@ async def start_long_running_task(
9991
except asyncio.CancelledError:
10092
# cancel the task, the client has disconnected
10193
if task_id:
102-
task_manager = get_tasks_manager(request_.app)
103-
await task_manager.cancel_task(task_id, with_task_context=None)
94+
long_running_manager = get_long_running_manager(request_.app)
95+
await long_running_manager.tasks_manager.cancel_task(
96+
task_id, with_task_context=None
97+
)
10498
raise
10599

106100

@@ -147,19 +141,20 @@ async def on_cleanup_ctx(app: web.Application) -> AsyncGenerator[None, None]:
147141
app.middlewares.append(base_long_running_error_handler)
148142

149143
# add components to state
150-
app[APP_LONG_RUNNING_TASKS_MANAGER_KEY] = long_running_task_manager = (
151-
TasksManager(
144+
app[APP_LONG_RUNNING_MANAGER_KEY] = long_running_manager = (
145+
AiohttpLongRunningManager(
146+
app=app,
152147
stale_task_check_interval=stale_task_check_interval,
153148
stale_task_detect_timeout=stale_task_detect_timeout,
154149
)
155150
)
156151

157-
await long_running_task_manager.setup()
152+
await long_running_manager.setup()
158153

159154
yield
160155

161156
# cleanup
162-
await long_running_task_manager.teardown()
157+
await long_running_manager.teardown()
163158

164159
# add routing (done at setup-time)
165160
_wrap_and_add_routes(

packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@
66
running task.
77
"""
88

9-
from ._dependencies import get_tasks_manager
10-
from ._routes import get_task_context
9+
from ._manager import get_long_running_manager
1110
from ._server import setup, start_long_running_task
1211

1312
__all__: tuple[str, ...] = (
14-
"get_task_context",
15-
"get_tasks_manager",
13+
"get_long_running_manager",
1614
"setup",
1715
"start_long_running_task",
1816
)

packages/service-library/src/servicelib/long_running_tasks/http_endpoint_responses.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
from typing import Any
22

3-
from .models import TaskGetWithoutHref, TaskId, TaskStatus
3+
from .models import TaskBase, TaskId, TaskStatus
44
from .task import TaskContext, TasksManager, TrackedTask
55

66

77
def list_tasks(
88
tasks_manager: TasksManager, task_context: TaskContext | None
9-
) -> list[TaskGetWithoutHref]:
9+
) -> list[TaskBase]:
1010
tracked_tasks: list[TrackedTask] = tasks_manager.list_tasks(
1111
with_task_context=task_context
1212
)
13-
return [
14-
TaskGetWithoutHref(task_id=t.task_id, task_name=t.task_name)
15-
for t in tracked_tasks
16-
]
13+
return [TaskBase(task_id=t.task_id, task_name=t.task_name) for t in tracked_tasks]
1714

1815

1916
def get_task_status(

packages/service-library/src/servicelib/long_running_tasks/models.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
TaskProgress,
1313
)
1414
from models_library.api_schemas_long_running_tasks.tasks import (
15+
TaskBase,
1516
TaskGet,
16-
TaskGetWithoutHref,
1717
TaskResult,
1818
TaskStatus,
1919
)
@@ -79,7 +79,7 @@ async def result(self) -> Any:
7979
"ProgressMessage",
8080
"ProgressPercent",
8181
"TaskGet",
82-
"TaskGetWithoutHref",
82+
"TaskBase",
8383
"TaskId",
8484
"TaskProgress",
8585
"TaskResult",

services/web/server/src/simcore_service_webserver/tasks/_rest.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222
from pydantic import BaseModel
2323
from servicelib.aiohttp import status
2424
from servicelib.aiohttp.long_running_tasks.server import (
25-
get_task_context,
26-
get_tasks_manager,
25+
get_long_running_manager,
2726
)
2827
from servicelib.aiohttp.requests_validation import (
2928
parse_request_path_parameters_as,
@@ -57,10 +56,10 @@
5756
@handle_export_data_exceptions
5857
@webserver_request_context_decorator
5958
async def get_async_jobs(request: web.Request) -> web.Response:
60-
inprocess_task_manager = get_tasks_manager(request.app)
61-
inprocess_task_context = get_task_context(request)
59+
inprocess_long_running_manager = get_long_running_manager(request.app)
6260
inprocess_tracked_tasks = http_endpoint_responses.list_tasks(
63-
inprocess_task_manager, inprocess_task_context
61+
inprocess_long_running_manager.tasks_manager,
62+
inprocess_long_running_manager.get_task_context(request),
6463
)
6564

6665
_req_ctx = AuthenticatedRequestContext.model_validate(request)

0 commit comments

Comments
 (0)