Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
52f3e66
moved start_task to TasksManager
Jun 12, 2025
9df9acd
refactored to remove job_name and merge unique in task_id
Jun 12, 2025
08d397e
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 12, 2025
d8365e8
fixed test
Jun 12, 2025
92eb372
reduce log level
Jun 13, 2025
835c2a9
refactor
Jun 13, 2025
910b47c
fixed issue
Jun 13, 2025
e6cbe66
fixed test
Jun 13, 2025
c3ab232
fixed warnings
Jun 13, 2025
b33189a
using dots as sperators
Jun 13, 2025
a4aa0d3
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 13, 2025
d9be27a
fixed specs
Jun 13, 2025
77855dd
updated note
Jun 13, 2025
01c58db
refactor to use registered task
Jun 13, 2025
5ca6450
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 13, 2025
a3a35b5
wired task_start via common interface
Jun 13, 2025
216c93a
refactor
Jun 13, 2025
0ed5b07
renamed module
Jun 13, 2025
6c9cae3
bring back task_name
Jun 13, 2025
1d81ced
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 13, 2025
69697c9
fixed broken tests
Jun 13, 2025
46b2ff3
updated specs
Jun 13, 2025
55bf414
hid internals
Jun 13, 2025
cd38439
refactor
Jun 13, 2025
bba46c8
refactor
Jun 13, 2025
d6885b2
refactor
Jun 13, 2025
d61ba00
fixed broken job name
Jun 13, 2025
888e10f
fixed validator
Jun 13, 2025
c3e7129
fixed broken
Jun 16, 2025
53d5791
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 16, 2025
479165b
revert change
Jun 16, 2025
1a40721
fixed
Jun 16, 2025
8544ee8
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 16, 2025
b916bbc
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 18, 2025
6a38a8b
feedback
Jun 18, 2025
cdfe157
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 18, 2025
4ba3509
fixed merge
Jun 18, 2025
07c905f
updated specs
Jun 18, 2025
c1adab9
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 16, 2025
d2b21f8
mege conflict edits
Jul 16, 2025
066db30
using previous message
Jul 16, 2025
ee599ab
Merge branch 'master' into pr-osparc-long-running-refactor-4
mergify[bot] Jul 18, 2025
a688579
merge branch 'pr-osparc-long-running-refactor-4' of github.com:GitHK/…
Jul 18, 2025
2a76198
updated specs
Jul 18, 2025
ca4949d
Merge branch 'master' into pr-osparc-long-running-refactor-4
mergify[bot] Jul 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import urllib.parse
from datetime import datetime
from typing import Any

from pydantic import BaseModel, field_validator
from pydantic import BaseModel

from .base import TaskId, TaskProgress

Expand All @@ -20,15 +19,9 @@ class TaskResult(BaseModel):

class TaskBase(BaseModel):
task_id: TaskId
task_name: str


class TaskGet(TaskBase):
status_href: str
result_href: str
abort_href: str

@field_validator("task_name")
@classmethod
def unquote_str(cls, v) -> str:
return urllib.parse.unquote(v)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pydantic import BaseModel
from servicelib.aiohttp import status

from ...long_running_tasks import http_endpoint_responses
from ...long_running_tasks import lrt_api
from ...long_running_tasks.models import TaskGet, TaskId, TaskStatus
from ..requests_validation import parse_request_path_parameters_as
from ..rest_responses import create_data_response
Expand All @@ -24,12 +24,11 @@ async def list_tasks(request: web.Request) -> web.Response:
[
TaskGet(
task_id=t.task_id,
task_name=t.task_name,
status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}",
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
)
for t in http_endpoint_responses.list_tasks(
for t in lrt_api.list_tasks(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
)
Expand All @@ -42,7 +41,7 @@ async def get_task_status(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(_PathParam, request)
long_running_manager = get_long_running_manager(request.app)

task_status: TaskStatus = http_endpoint_responses.get_task_status(
task_status: TaskStatus = lrt_api.get_task_status(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
path_params.task_id,
Expand All @@ -56,7 +55,7 @@ async def get_task_result(request: web.Request) -> web.Response | Any:
long_running_manager = get_long_running_manager(request.app)

# NOTE: this might raise an exception that will be catched by the _error_handlers
return await http_endpoint_responses.get_task_result(
return await lrt_api.get_task_result(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
path_params.task_id,
Expand All @@ -68,7 +67,7 @@ async def cancel_and_delete_task(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(_PathParam, request)
long_running_manager = get_long_running_manager(request.app)

await http_endpoint_responses.remove_task(
await lrt_api.remove_task(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
path_params.task_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
from pydantic import AnyHttpUrl, TypeAdapter

from ...aiohttp import status
from ...long_running_tasks import lrt_api
from ...long_running_tasks.constants import (
DEFAULT_STALE_TASK_CHECK_INTERVAL,
DEFAULT_STALE_TASK_DETECT_TIMEOUT,
)
from ...long_running_tasks.models import TaskGet
from ...long_running_tasks.task import TaskContext, TaskProtocol, start_task
from ...long_running_tasks.task import RegisteredTaskName, TaskContext
from ..typing_extension import Handler
from . import _routes
from ._constants import (
Expand All @@ -25,11 +26,11 @@
from ._manager import AiohttpLongRunningManager, get_long_running_manager


def no_ops_decorator(handler: Handler):
def _no_ops_decorator(handler: Handler):
return handler


def no_task_context_decorator(handler: Handler):
def _no_task_context_decorator(handler: Handler):
@wraps(handler)
async def _wrap(request: web.Request):
request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY] = {}
Expand All @@ -45,7 +46,7 @@ def _create_task_name_from_request(request: web.Request) -> str:
async def start_long_running_task(
# NOTE: positional argument are suffixed with "_" to avoid name conflicts with "task_kwargs" keys
request_: web.Request,
task_: TaskProtocol,
registerd_task_name: RegisteredTaskName,
*,
fire_and_forget: bool = False,
task_context: TaskContext,
Expand All @@ -55,9 +56,9 @@ async def start_long_running_task(
task_name = _create_task_name_from_request(request_)
task_id = None
try:
task_id = start_task(
task_id = await lrt_api.start_task(
long_running_manager.tasks_manager,
task_,
registerd_task_name,
fire_and_forget=fire_and_forget,
task_context=task_context,
task_name=task_name,
Expand All @@ -78,7 +79,6 @@ async def start_long_running_task(
)
task_get = TaskGet(
task_id=task_id,
task_name=task_name,
status_href=f"{status_url}",
result_href=f"{result_url}",
abort_href=f"{abort_url}",
Expand Down Expand Up @@ -121,8 +121,8 @@ def setup(
app: web.Application,
*,
router_prefix: str,
handler_check_decorator: Callable = no_ops_decorator,
task_request_context_decorator: Callable = no_task_context_decorator,
handler_check_decorator: Callable = _no_ops_decorator,
task_request_context_decorator: Callable = _no_task_context_decorator,
stale_task_check_interval: datetime.timedelta = DEFAULT_STALE_TASK_CHECK_INTERVAL,
stale_task_detect_timeout: datetime.timedelta = DEFAULT_STALE_TASK_DETECT_TIMEOUT,
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from fastapi import APIRouter, Depends, Request, status

from ...long_running_tasks import http_endpoint_responses
from ...long_running_tasks import lrt_api
from ...long_running_tasks.models import TaskGet, TaskId, TaskResult, TaskStatus
from ..requests_decorators import cancel_on_disconnect
from ._dependencies import get_long_running_manager
Expand All @@ -23,14 +23,13 @@ async def list_tasks(
return [
TaskGet(
task_id=t.task_id,
task_name=t.task_name,
status_href=str(request.url_for("get_task_status", task_id=t.task_id)),
result_href=str(request.url_for("get_task_result", task_id=t.task_id)),
abort_href=str(
request.url_for("cancel_and_delete_task", task_id=t.task_id)
),
)
for t in http_endpoint_responses.list_tasks(
for t in lrt_api.list_tasks(
long_running_manager.tasks_manager, task_context=None
)
]
Expand All @@ -52,7 +51,7 @@ async def get_task_status(
],
) -> TaskStatus:
assert request # nosec
return http_endpoint_responses.get_task_status(
return lrt_api.get_task_status(
long_running_manager.tasks_manager, task_context=None, task_id=task_id
)

Expand All @@ -75,7 +74,7 @@ async def get_task_result(
],
) -> TaskResult | Any:
assert request # nosec
return await http_endpoint_responses.get_task_result(
return await lrt_api.get_task_result(
long_running_manager.tasks_manager, task_context=None, task_id=task_id
)

Expand All @@ -98,6 +97,6 @@ async def cancel_and_delete_task(
],
) -> None:
assert request # nosec
await http_endpoint_responses.remove_task(
await lrt_api.remove_task(
long_running_manager.tasks_manager, task_context=None, task_id=task_id
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ class BaseLongRunningError(OsparcErrorMixin, Exception):
"""base exception for this module"""


class TaskNotRegisteredError(BaseLongRunningError):
msg_template: str = (
"notask with task_name='{task_name}' was found in the task registry. "
"Make sure it's registered before starting it."
)


class TaskAlreadyRunningError(BaseLongRunningError):
msg_template: str = "{task_name} must be unique, found: '{managed_task}'"

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import Any

from .models import TaskBase, TaskId, TaskStatus
from .task import RegisteredTaskName, TaskContext, TasksManager, TrackedTask


async def start_task(
tasks_manager: TasksManager,
registered_task_name: RegisteredTaskName,
*,
unique: bool = False,
task_context: TaskContext | None = None,
task_name: str | None = None,
fire_and_forget: bool = False,
**task_kwargs: Any,
) -> TaskId:
"""
Creates a background task from an async function.

An asyncio task will be created out of it by injecting a `TaskProgress` as the first
positional argument and adding all `handler_kwargs` as named parameters.

NOTE: the progress is automatically bounded between 0 and 1
NOTE: the `task` name must be unique in the module, otherwise when using
the unique parameter is True, it will not be able to distinguish between
them.

Args:
tasks_manager (TasksManager): the tasks manager
task (TaskProtocol): the tasks to be run in the background
unique (bool, optional): If True, then only one such named task may be run. Defaults to False.
task_context (Optional[TaskContext], optional): a task context storage can be retrieved during the task lifetime. Defaults to None.
task_name (Optional[str], optional): optional task name. Defaults to None.
fire_and_forget: if True, then the task will not be cancelled if the status is never called

Raises:
TaskAlreadyRunningError: if unique is True, will raise if more than 1 such named task is started

Returns:
TaskId: the task unique identifier
"""
return tasks_manager.start_task(
registered_task_name,
unique=unique,
task_context=task_context,
task_name=task_name,
fire_and_forget=fire_and_forget,
**task_kwargs,
)


def list_tasks(
tasks_manager: TasksManager, task_context: TaskContext | None
) -> list[TaskBase]:
tracked_tasks: list[TrackedTask] = tasks_manager.list_tasks(
with_task_context=task_context
)
return [TaskBase(task_id=t.task_id) for t in tracked_tasks]


def get_task_status(
tasks_manager: TasksManager, task_context: TaskContext | None, task_id: TaskId
) -> TaskStatus:
"""returns the status of a task"""
return tasks_manager.get_task_status(
task_id=task_id, with_task_context=task_context
)


async def get_task_result(
tasks_manager: TasksManager, task_context: TaskContext | None, task_id: TaskId
) -> Any:
"""retruns the result of a task, which is directly whatever the remove hanlder returned"""
try:
return tasks_manager.get_task_result(
task_id=task_id, with_task_context=task_context
)
finally:
# the task is always removed even if an error occurs
await tasks_manager.remove_task(
task_id, with_task_context=task_context, reraise_errors=False
)


async def remove_task(
tasks_manager: TasksManager, task_context: TaskContext | None, task_id: TaskId
) -> None:
"""removes / cancels a task"""
await tasks_manager.remove_task(task_id, with_task_context=task_context)
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
)
from pydantic import BaseModel, ConfigDict, Field, PositiveFloat

TaskName: TypeAlias = str

TaskType: TypeAlias = Callable[..., Coroutine[Any, Any, Any]]

ProgressCallback: TypeAlias = Callable[
Expand All @@ -33,7 +31,6 @@
class TrackedTask(BaseModel):
task_id: str
task: Task
task_name: TaskName
task_progress: TaskProgress
# NOTE: this context lifetime is with the tracked task (similar to aiohttp storage concept)
task_context: dict[str, Any]
Expand Down
Loading
Loading