Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
52f3e66
moved start_task to TasksManager
Jun 12, 2025
9df9acd
refactored to remove job_name and merge unique in task_id
Jun 12, 2025
08d397e
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 12, 2025
d8365e8
fixed test
Jun 12, 2025
92eb372
reduce log level
Jun 13, 2025
835c2a9
refactor
Jun 13, 2025
910b47c
fixed issue
Jun 13, 2025
e6cbe66
fixed test
Jun 13, 2025
c3ab232
fixed warnings
Jun 13, 2025
b33189a
using dots as sperators
Jun 13, 2025
a4aa0d3
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 13, 2025
d9be27a
fixed specs
Jun 13, 2025
77855dd
updated note
Jun 13, 2025
01c58db
refactor to use registered task
Jun 13, 2025
5ca6450
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 13, 2025
a3a35b5
wired task_start via common interface
Jun 13, 2025
216c93a
refactor
Jun 13, 2025
0ed5b07
renamed module
Jun 13, 2025
6c9cae3
bring back task_name
Jun 13, 2025
1d81ced
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 13, 2025
69697c9
fixed broken tests
Jun 13, 2025
46b2ff3
updated specs
Jun 13, 2025
55bf414
hid internals
Jun 13, 2025
cd38439
refactor
Jun 13, 2025
bba46c8
refactor
Jun 13, 2025
d6885b2
refactor
Jun 13, 2025
d61ba00
fixed broken job name
Jun 13, 2025
888e10f
fixed validator
Jun 13, 2025
c3e7129
fixed broken
Jun 16, 2025
53d5791
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 16, 2025
479165b
revert change
Jun 16, 2025
1a40721
fixed
Jun 16, 2025
8544ee8
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 16, 2025
b916bbc
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 18, 2025
6a38a8b
feedback
Jun 18, 2025
cdfe157
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 18, 2025
4ba3509
fixed merge
Jun 18, 2025
07c905f
updated specs
Jun 18, 2025
c1adab9
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 16, 2025
d2b21f8
mege conflict edits
Jul 16, 2025
066db30
using previous message
Jul 16, 2025
ee599ab
Merge branch 'master' into pr-osparc-long-running-refactor-4
mergify[bot] Jul 18, 2025
a688579
merge branch 'pr-osparc-long-running-refactor-4' of github.com:GitHK/…
Jul 18, 2025
2a76198
updated specs
Jul 18, 2025
ca4949d
Merge branch 'master' into pr-osparc-long-running-refactor-4
mergify[bot] Jul 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from datetime import datetime
from typing import Any

from pydantic import BaseModel, field_validator
from common_library.exclude import Unset
from pydantic import BaseModel, ConfigDict, model_validator

from .base import TaskId, TaskProgress

Expand All @@ -20,15 +21,30 @@ class TaskResult(BaseModel):

class TaskBase(BaseModel):
task_id: TaskId
task_name: str
task_name: str | Unset = Unset.VALUE

@model_validator(mode="after")
def try_populate_task_name_from_task_id(self) -> "TaskBase":
# NOTE: currently this model is used to validate tasks coming from
# the celery backend and form long_running_tasks
# 1. if a task comes from Celery, it will keep it's given name
# 2. if a task comes from long_running_tasks, it will extract it form
# the task_id, which looks like "{PREFIX}.{TASK_NAME}.UNIQUE|{UUID}"

if self.task_id and self.task_name == Unset.VALUE:
parts = self.task_id.split(".")
if len(parts) > 1:
self.task_name = urllib.parse.unquote(parts[1])

if self.task_name == Unset.VALUE:
self.task_name = self.task_id

return self

model_config = ConfigDict(arbitrary_types_allowed=True)


class TaskGet(TaskBase):
status_href: str
result_href: str
abort_href: str

@field_validator("task_name")
@classmethod
def unquote_str(cls, v) -> str:
return urllib.parse.unquote(v)
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import pytest
from models_library.api_schemas_long_running_tasks.tasks import TaskGet
from pydantic import TypeAdapter


def _get_data_without_task_name(task_id: str) -> dict:
return {
"task_id": task_id,
"status_href": "",
"result_href": "",
"abort_href": "",
}


@pytest.mark.parametrize(
"data, expected_task_name",
[
(_get_data_without_task_name("a.b.c.d"), "b"),
(_get_data_without_task_name("a.b.c"), "b"),
(_get_data_without_task_name("a.b"), "b"),
(_get_data_without_task_name("a"), "a"),
],
)
def test_try_extract_task_name(data: dict, expected_task_name: str) -> None:
task_get = TaskGet(**data)
assert task_get.task_name == expected_task_name

task_get = TypeAdapter(TaskGet).validate_python(data)
assert task_get.task_name == expected_task_name


def _get_data_with_task_name(task_id: str, task_name: str) -> dict:
return {
"task_id": task_id,
"task_name": task_name,
"status_href": "",
"result_href": "",
"abort_href": "",
}


@pytest.mark.parametrize(
"data, expected_task_name",
[
(_get_data_with_task_name("a.b.c.d", "a_name"), "a_name"),
(_get_data_with_task_name("a.b.c", "a_name"), "a_name"),
(_get_data_with_task_name("a.b", "a_name"), "a_name"),
(_get_data_with_task_name("a", "a_name"), "a_name"),
],
)
def test_task_name_is_provided(data: dict, expected_task_name: str) -> None:
task_get = TaskGet(**data)
assert task_get.task_name == expected_task_name

task_get = TypeAdapter(TaskGet).validate_python(data)
assert task_get.task_name == expected_task_name
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pydantic import BaseModel
from servicelib.aiohttp import status

from ...long_running_tasks import http_endpoint_responses
from ...long_running_tasks import lrt_api
from ...long_running_tasks.models import TaskGet, TaskId, TaskStatus
from ..requests_validation import parse_request_path_parameters_as
from ..rest_responses import create_data_response
Expand All @@ -24,12 +24,11 @@ async def list_tasks(request: web.Request) -> web.Response:
[
TaskGet(
task_id=t.task_id,
task_name=t.task_name,
status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}",
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
)
for t in http_endpoint_responses.list_tasks(
for t in lrt_api.list_tasks(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
)
Expand All @@ -42,7 +41,7 @@ async def get_task_status(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(_PathParam, request)
long_running_manager = get_long_running_manager(request.app)

task_status: TaskStatus = http_endpoint_responses.get_task_status(
task_status: TaskStatus = lrt_api.get_task_status(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
path_params.task_id,
Expand All @@ -56,7 +55,7 @@ async def get_task_result(request: web.Request) -> web.Response | Any:
long_running_manager = get_long_running_manager(request.app)

# NOTE: this might raise an exception that will be catched by the _error_handlers
return await http_endpoint_responses.get_task_result(
return await lrt_api.get_task_result(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
path_params.task_id,
Expand All @@ -68,7 +67,7 @@ async def cancel_and_delete_task(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(_PathParam, request)
long_running_manager = get_long_running_manager(request.app)

await http_endpoint_responses.remove_task(
await lrt_api.remove_task(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
path_params.task_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
from pydantic import AnyHttpUrl, TypeAdapter

from ...aiohttp import status
from ...long_running_tasks import lrt_api
from ...long_running_tasks.constants import (
DEFAULT_STALE_TASK_CHECK_INTERVAL,
DEFAULT_STALE_TASK_DETECT_TIMEOUT,
)
from ...long_running_tasks.models import TaskGet
from ...long_running_tasks.task import TaskContext, TaskProtocol, start_task
from ...long_running_tasks.task import RegisteredTaskName, TaskContext
from ..typing_extension import Handler
from . import _routes
from ._constants import (
Expand All @@ -25,11 +26,11 @@
from ._manager import AiohttpLongRunningManager, get_long_running_manager


def no_ops_decorator(handler: Handler):
def _no_ops_decorator(handler: Handler):
return handler


def no_task_context_decorator(handler: Handler):
def _no_task_context_decorator(handler: Handler):
@wraps(handler)
async def _wrap(request: web.Request):
request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY] = {}
Expand All @@ -45,7 +46,7 @@ def _create_task_name_from_request(request: web.Request) -> str:
async def start_long_running_task(
# NOTE: positional argument are suffixed with "_" to avoid name conflicts with "task_kwargs" keys
request_: web.Request,
task_: TaskProtocol,
registerd_task_name: RegisteredTaskName,
*,
fire_and_forget: bool = False,
task_context: TaskContext,
Expand All @@ -55,9 +56,9 @@ async def start_long_running_task(
task_name = _create_task_name_from_request(request_)
task_id = None
try:
task_id = start_task(
task_id = await lrt_api.start_task(
long_running_manager.tasks_manager,
task_,
registerd_task_name,
fire_and_forget=fire_and_forget,
task_context=task_context,
task_name=task_name,
Expand All @@ -78,7 +79,6 @@ async def start_long_running_task(
)
task_get = TaskGet(
task_id=task_id,
task_name=task_name,
status_href=f"{status_url}",
result_href=f"{result_url}",
abort_href=f"{abort_url}",
Expand Down Expand Up @@ -121,8 +121,8 @@ def setup(
app: web.Application,
*,
router_prefix: str,
handler_check_decorator: Callable = no_ops_decorator,
task_request_context_decorator: Callable = no_task_context_decorator,
handler_check_decorator: Callable = _no_ops_decorator,
task_request_context_decorator: Callable = _no_task_context_decorator,
stale_task_check_interval: datetime.timedelta = DEFAULT_STALE_TASK_CHECK_INTERVAL,
stale_task_detect_timeout: datetime.timedelta = DEFAULT_STALE_TASK_DETECT_TIMEOUT,
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from fastapi import APIRouter, Depends, Request, status

from ...long_running_tasks import http_endpoint_responses
from ...long_running_tasks import lrt_api
from ...long_running_tasks.models import TaskGet, TaskId, TaskResult, TaskStatus
from ..requests_decorators import cancel_on_disconnect
from ._dependencies import get_long_running_manager
Expand All @@ -23,14 +23,13 @@ async def list_tasks(
return [
TaskGet(
task_id=t.task_id,
task_name=t.task_name,
status_href=str(request.url_for("get_task_status", task_id=t.task_id)),
result_href=str(request.url_for("get_task_result", task_id=t.task_id)),
abort_href=str(
request.url_for("cancel_and_delete_task", task_id=t.task_id)
),
)
for t in http_endpoint_responses.list_tasks(
for t in lrt_api.list_tasks(
long_running_manager.tasks_manager, task_context=None
)
]
Expand All @@ -52,7 +51,7 @@ async def get_task_status(
],
) -> TaskStatus:
assert request # nosec
return http_endpoint_responses.get_task_status(
return lrt_api.get_task_status(
long_running_manager.tasks_manager, task_context=None, task_id=task_id
)

Expand All @@ -75,7 +74,7 @@ async def get_task_result(
],
) -> TaskResult | Any:
assert request # nosec
return await http_endpoint_responses.get_task_result(
return await lrt_api.get_task_result(
long_running_manager.tasks_manager, task_context=None, task_id=task_id
)

Expand All @@ -98,6 +97,6 @@ async def cancel_and_delete_task(
],
) -> None:
assert request # nosec
await http_endpoint_responses.remove_task(
await lrt_api.remove_task(
long_running_manager.tasks_manager, task_context=None, task_id=task_id
)
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def create_troubleshootting_log_kwargs(
...
except MyException as exc
_logger.exception(
**create_troubleshotting_log_kwargs(
**create_troubleshootting_log_kwargs(
user_error_msg=frontend_msg,
error=exc,
error_context={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ class BaseLongRunningError(OsparcErrorMixin, Exception):
"""base exception for this module"""


class TaskNotRegisteredError(BaseLongRunningError):
msg_template: str = (
"no task with task_name='{task_name}' was found in the task registry. "
"Make sure it's registered before starting it."
)


class TaskAlreadyRunningError(BaseLongRunningError):
msg_template: str = "{task_name} must be unique, found: '{managed_task}'"

Expand Down

This file was deleted.

Loading
Loading