Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
07dbc67
changing visibility
Jun 5, 2025
4c689a2
fixed imports
Jun 5, 2025
7977cd7
fixed imports
Jun 5, 2025
d7f04d1
removed unrequired
Jun 5, 2025
f50e77d
refactor imports
Jun 5, 2025
4cfa96d
refactor more imports
Jun 5, 2025
421f8cf
refactor
Jun 5, 2025
e737641
fixed import
Jun 6, 2025
ada5069
fixed broken imports
Jun 6, 2025
7070b66
fixed imports
Jun 6, 2025
61daf54
fixeed imports
Jun 6, 2025
2e752e4
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 6, 2025
75aa300
added missing specs
Jun 6, 2025
d943710
removed unused code
Jun 6, 2025
6a6677c
remove unrequired
Jun 6, 2025
47e5667
removed unused
Jun 6, 2025
e2dfb66
Merge remote-tracking branch 'upstream/master' into long-running-refa…
Jun 10, 2025
87dec05
reverterd interface change
Jun 10, 2025
ebe6a43
reverted
Jun 10, 2025
f630a39
updated openapi specs
Jun 10, 2025
757be74
using commond part to format responses
Jun 10, 2025
8497191
minor refacto
Jun 10, 2025
866656d
removed unused
Jun 10, 2025
2ee9203
refactor to use common constants
Jun 10, 2025
d886db0
using common renaming
Jun 10, 2025
e7df051
Merge remote-tracking branch 'upstream/master' into long-running-refa…
Jun 11, 2025
ecd1987
fixed imports
Jun 11, 2025
ea17b89
removed unused
Jun 11, 2025
0578e5c
refacto tasks
Jun 11, 2025
a4660de
minor
Jun 11, 2025
70a6921
revert changes
Jun 11, 2025
8374de5
fixed specs
Jun 11, 2025
221e051
fixed failing tests
Jun 11, 2025
fd50810
fixed imports
Jun 11, 2025
08a65e7
pylint
Jun 11, 2025
c30f80d
removed some constants
Jun 11, 2025
87b6b45
moved module
Jun 11, 2025
8d894ef
rename module
Jun 11, 2025
d2dbc95
import from correct place
Jun 11, 2025
8462647
refactor internals
Jun 11, 2025
cb5064b
refactor
Jun 12, 2025
d0d8f85
using long_running_manager for aiohttp
Jun 12, 2025
01210d5
refactor internals
Jun 12, 2025
8f4f4f4
Merge remote-tracking branch 'upstream/master' into long-running-refa…
Jun 12, 2025
aa03848
fastapi base migrated to long_running_manager
Jun 12, 2025
edea03d
fixed tests
Jun 12, 2025
19756ba
using annotations
Jun 12, 2025
66ec8e6
using annotations
Jun 12, 2025
b1d46bc
refactor servies to use proper interface
Jun 12, 2025
37192b6
fixed test
Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/specs/web-server/_long_running_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def cancel_async_job(

@router.get(
"/tasks/{task_id}/result",
response_model=Any,
name="get_task_result",
description="Retrieves the result of a task",
responses=_export_data_responses,
Expand Down
3 changes: 2 additions & 1 deletion api/specs/web-server/_long_running_tasks_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# pylint: disable=too-many-arguments


from typing import Annotated
from typing import Annotated, Any

from fastapi import APIRouter, Depends, status
from models_library.generics import Envelope
Expand Down Expand Up @@ -54,6 +54,7 @@ def cancel_and_delete_task(
@router.get(
"/{task_id}/result",
name="get_task_result",
response_model=Any,
description="Retrieves the result of a task",
)
def get_task_result(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ class TaskResult(BaseModel):
error: Any | None


class TaskGet(BaseModel):
class TaskBase(BaseModel):
task_id: TaskId
task_name: str


class TaskGet(TaskBase):
status_href: str
result_href: str
abort_href: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from pydantic import PositiveFloat

MINUTE: Final[PositiveFloat] = 60
APP_LONG_RUNNING_TASKS_MANAGER_KEY: Final[
str
] = f"{__name__ }.long_running_tasks.tasks_manager"
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY: Final[
str
] = f"{__name__}.long_running_tasks.context"
APP_LONG_RUNNING_MANAGER_KEY: Final[str] = (
f"{__name__ }.long_running_tasks.tasks_manager"
)
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY: Final[str] = (
f"{__name__}.long_running_tasks.context"
)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import datetime

from aiohttp import web

from ...long_running_tasks.base_long_running_manager import BaseLongRunningManager
from ...long_running_tasks.task import TaskContext, TasksManager
from ._constants import APP_LONG_RUNNING_MANAGER_KEY
from ._request import get_task_context


class AiohttpLongRunningManager(BaseLongRunningManager):
def __init__(
self,
app: web.Application,
stale_task_check_interval: datetime.timedelta,
stale_task_detect_timeout: datetime.timedelta,
):
self._app = app
self._tasks_manager = TasksManager(
stale_task_check_interval=stale_task_check_interval,
stale_task_detect_timeout=stale_task_detect_timeout,
)

@property
def tasks_manager(self) -> TasksManager:
return self._tasks_manager

async def setup(self) -> None:
await self._tasks_manager.setup()

async def teardown(self) -> None:
await self._tasks_manager.teardown()

@staticmethod
def get_task_context(request: web.Request) -> TaskContext:
return get_task_context(request)


def get_long_running_manager(app: web.Application) -> AiohttpLongRunningManager:
output: AiohttpLongRunningManager = app[APP_LONG_RUNNING_MANAGER_KEY]
return output
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Any

from aiohttp import web

from ._constants import RQT_LONG_RUNNING_TASKS_CONTEXT_KEY


def get_task_context(request: web.Request) -> dict[str, Any]:
output: dict[str, Any] = request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY]
return output
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import logging
from typing import Any

from aiohttp import web
from common_library.json_serialization import json_dumps
from pydantic import BaseModel
from servicelib.aiohttp import status

from ...long_running_tasks.errors import TaskNotCompletedError, TaskNotFoundError
from ...long_running_tasks import http_endpoint_responses
from ...long_running_tasks.models import TaskGet, TaskId, TaskStatus
from ...long_running_tasks.task import TrackedTask
from ..requests_validation import parse_request_path_parameters_as
from ._dependencies import get_task_context, get_tasks_manager
from ..rest_responses import create_data_response
from ._manager import get_long_running_manager

_logger = logging.getLogger(__name__)
routes = web.RouteTableDef()


Expand All @@ -22,79 +19,58 @@ class _PathParam(BaseModel):

@routes.get("", name="list_tasks")
async def list_tasks(request: web.Request) -> web.Response:
tasks_manager = get_tasks_manager(request.app)
task_context = get_task_context(request)
tracked_tasks: list[TrackedTask] = tasks_manager.list_tasks(
with_task_context=task_context
)

return web.json_response(
{
"data": [
TaskGet(
task_id=t.task_id,
task_name=t.task_name,
status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}",
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
)
for t in tracked_tasks
]
},
dumps=json_dumps,
long_running_manager = get_long_running_manager(request.app)
return create_data_response(
[
TaskGet(
task_id=t.task_id,
task_name=t.task_name,
status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}",
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
)
for t in http_endpoint_responses.list_tasks(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
)
]
)


@routes.get("/{task_id}", name="get_task_status")
async def get_task_status(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(_PathParam, request)
tasks_manager = get_tasks_manager(request.app)
task_context = get_task_context(request)
long_running_manager = get_long_running_manager(request.app)

task_status: TaskStatus = tasks_manager.get_task_status(
task_id=path_params.task_id, with_task_context=task_context
task_status: TaskStatus = http_endpoint_responses.get_task_status(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
path_params.task_id,
)
return web.json_response({"data": task_status}, dumps=json_dumps)
return create_data_response(task_status)


@routes.get("/{task_id}/result", name="get_task_result")
async def get_task_result(request: web.Request) -> web.Response | Any:
path_params = parse_request_path_parameters_as(_PathParam, request)
tasks_manager = get_tasks_manager(request.app)
task_context = get_task_context(request)
long_running_manager = get_long_running_manager(request.app)

# NOTE: this might raise an exception that will be catched by the _error_handlers
try:
task_result = tasks_manager.get_task_result(
task_id=path_params.task_id, with_task_context=task_context
)
# NOTE: this will fail if the task failed for some reason....
await tasks_manager.remove_task(
path_params.task_id, with_task_context=task_context, reraise_errors=False
)
return task_result
except (TaskNotFoundError, TaskNotCompletedError):
raise
except Exception:
# the task shall be removed in this case
await tasks_manager.remove_task(
path_params.task_id, with_task_context=task_context, reraise_errors=False
)
raise
return await http_endpoint_responses.get_task_result(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
path_params.task_id,
)


@routes.delete("/{task_id}", name="cancel_and_delete_task")
async def cancel_and_delete_task(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(_PathParam, request)
tasks_manager = get_tasks_manager(request.app)
task_context = get_task_context(request)
await tasks_manager.remove_task(path_params.task_id, with_task_context=task_context)
return web.json_response(status=status.HTTP_204_NO_CONTENT)
long_running_manager = get_long_running_manager(request.app)


__all__: tuple[str, ...] = (
"get_tasks_manager",
"TaskId",
"TaskGet",
"TaskStatus",
)
await http_endpoint_responses.remove_task(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
path_params.task_id,
)
return web.json_response(status=status.HTTP_204_NO_CONTENT)
Original file line number Diff line number Diff line change
@@ -1,32 +1,28 @@
import asyncio
import logging
import datetime
from collections.abc import AsyncGenerator, Callable
from functools import wraps
from typing import Any

from aiohttp import web
from common_library.json_serialization import json_dumps
from pydantic import AnyHttpUrl, PositiveFloat, TypeAdapter
from pydantic import AnyHttpUrl, TypeAdapter

from ...aiohttp import status
from ...long_running_tasks.models import TaskGet
from ...long_running_tasks.task import (
TaskContext,
TaskProtocol,
TasksManager,
start_task,
from ...long_running_tasks.constants import (
DEFAULT_STALE_TASK_CHECK_INTERVAL,
DEFAULT_STALE_TASK_DETECT_TIMEOUT,
)
from ...long_running_tasks.models import TaskGet
from ...long_running_tasks.task import TaskContext, TaskProtocol, start_task
from ..typing_extension import Handler
from . import _routes
from ._constants import (
APP_LONG_RUNNING_TASKS_MANAGER_KEY,
MINUTE,
APP_LONG_RUNNING_MANAGER_KEY,
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
)
from ._dependencies import create_task_name_from_request, get_tasks_manager
from ._error_handlers import base_long_running_error_handler

_logger = logging.getLogger(__name__)
from ._manager import AiohttpLongRunningManager, get_long_running_manager


def no_ops_decorator(handler: Handler):
Expand All @@ -42,6 +38,10 @@ async def _wrap(request: web.Request):
return _wrap


def _create_task_name_from_request(request: web.Request) -> str:
return f"{request.method} {request.rel_url}"


async def start_long_running_task(
# NOTE: positional argument are suffixed with "_" to avoid name conflicts with "task_kwargs" keys
request_: web.Request,
Expand All @@ -51,12 +51,12 @@ async def start_long_running_task(
task_context: TaskContext,
**task_kwargs: Any,
) -> web.Response:
task_manager = get_tasks_manager(request_.app)
task_name = create_task_name_from_request(request_)
long_running_manager = get_long_running_manager(request_.app)
task_name = _create_task_name_from_request(request_)
task_id = None
try:
task_id = start_task(
task_manager,
long_running_manager.tasks_manager,
task_,
fire_and_forget=fire_and_forget,
task_context=task_context,
Expand Down Expand Up @@ -91,8 +91,10 @@ async def start_long_running_task(
except asyncio.CancelledError:
# cancel the task, the client has disconnected
if task_id:
task_manager = get_tasks_manager(request_.app)
await task_manager.cancel_task(task_id, with_task_context=None)
long_running_manager = get_long_running_manager(request_.app)
await long_running_manager.tasks_manager.cancel_task(
task_id, with_task_context=None
)
raise


Expand Down Expand Up @@ -121,8 +123,8 @@ def setup(
router_prefix: str,
handler_check_decorator: Callable = no_ops_decorator,
task_request_context_decorator: Callable = no_task_context_decorator,
stale_task_check_interval_s: PositiveFloat = 1 * MINUTE,
stale_task_detect_timeout_s: PositiveFloat = 5 * MINUTE,
stale_task_check_interval: datetime.timedelta = DEFAULT_STALE_TASK_CHECK_INTERVAL,
stale_task_detect_timeout: datetime.timedelta = DEFAULT_STALE_TASK_DETECT_TIMEOUT,
) -> None:
"""
- `router_prefix` APIs are mounted on `/...`, this
Expand All @@ -135,21 +137,24 @@ def setup(
"""

async def on_cleanup_ctx(app: web.Application) -> AsyncGenerator[None, None]:
# add error handlers
app.middlewares.append(base_long_running_error_handler)

# add components to state
app[APP_LONG_RUNNING_TASKS_MANAGER_KEY] = long_running_task_manager = (
TasksManager(
stale_task_check_interval_s=stale_task_check_interval_s,
stale_task_detect_timeout_s=stale_task_detect_timeout_s,
app[APP_LONG_RUNNING_MANAGER_KEY] = long_running_manager = (
AiohttpLongRunningManager(
app=app,
stale_task_check_interval=stale_task_check_interval,
stale_task_detect_timeout=stale_task_detect_timeout,
)
)

# add error handlers
app.middlewares.append(base_long_running_error_handler)
await long_running_manager.setup()

yield

# cleanup
await long_running_task_manager.close()
await long_running_manager.teardown()

# add routing (done at setup-time)
_wrap_and_add_routes(
Expand Down
Loading
Loading