Skip to content

Commit ae5f894

Browse files
GitHKAndrei Neagumergify[bot]
authored
♻️ preparing TasksManager's interface to be extracted into a common interface (ITISFoundation#7884)
Co-authored-by: Andrei Neagu <[email protected]> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent e805af6 commit ae5f894

File tree

30 files changed

+595
-433
lines changed

30 files changed

+595
-433
lines changed

packages/models-library/src/models_library/api_schemas_long_running_tasks/tasks.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
from datetime import datetime
33
from typing import Any
44

5-
from pydantic import BaseModel, field_validator
5+
from common_library.exclude import Unset
6+
from pydantic import BaseModel, ConfigDict, model_validator
67

78
from .base import TaskId, TaskProgress
89

@@ -20,15 +21,30 @@ class TaskResult(BaseModel):
2021

2122
class TaskBase(BaseModel):
2223
task_id: TaskId
23-
task_name: str
24+
task_name: str | Unset = Unset.VALUE
25+
26+
@model_validator(mode="after")
27+
def try_populate_task_name_from_task_id(self) -> "TaskBase":
28+
# NOTE: currently this model is used to validate tasks coming from
29+
# the celery backend and form long_running_tasks
30+
# 1. if a task comes from Celery, it will keep it's given name
31+
# 2. if a task comes from long_running_tasks, it will extract it form
32+
# the task_id, which looks like "{PREFIX}.{TASK_NAME}.UNIQUE|{UUID}"
33+
34+
if self.task_id and self.task_name == Unset.VALUE:
35+
parts = self.task_id.split(".")
36+
if len(parts) > 1:
37+
self.task_name = urllib.parse.unquote(parts[1])
38+
39+
if self.task_name == Unset.VALUE:
40+
self.task_name = self.task_id
41+
42+
return self
43+
44+
model_config = ConfigDict(arbitrary_types_allowed=True)
2445

2546

2647
class TaskGet(TaskBase):
2748
status_href: str
2849
result_href: str
2950
abort_href: str
30-
31-
@field_validator("task_name")
32-
@classmethod
33-
def unquote_str(cls, v) -> str:
34-
return urllib.parse.unquote(v)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import pytest
2+
from models_library.api_schemas_long_running_tasks.tasks import TaskGet
3+
from pydantic import TypeAdapter
4+
5+
6+
def _get_data_without_task_name(task_id: str) -> dict:
7+
return {
8+
"task_id": task_id,
9+
"status_href": "",
10+
"result_href": "",
11+
"abort_href": "",
12+
}
13+
14+
15+
@pytest.mark.parametrize(
16+
"data, expected_task_name",
17+
[
18+
(_get_data_without_task_name("a.b.c.d"), "b"),
19+
(_get_data_without_task_name("a.b.c"), "b"),
20+
(_get_data_without_task_name("a.b"), "b"),
21+
(_get_data_without_task_name("a"), "a"),
22+
],
23+
)
24+
def test_try_extract_task_name(data: dict, expected_task_name: str) -> None:
25+
task_get = TaskGet(**data)
26+
assert task_get.task_name == expected_task_name
27+
28+
task_get = TypeAdapter(TaskGet).validate_python(data)
29+
assert task_get.task_name == expected_task_name
30+
31+
32+
def _get_data_with_task_name(task_id: str, task_name: str) -> dict:
33+
return {
34+
"task_id": task_id,
35+
"task_name": task_name,
36+
"status_href": "",
37+
"result_href": "",
38+
"abort_href": "",
39+
}
40+
41+
42+
@pytest.mark.parametrize(
43+
"data, expected_task_name",
44+
[
45+
(_get_data_with_task_name("a.b.c.d", "a_name"), "a_name"),
46+
(_get_data_with_task_name("a.b.c", "a_name"), "a_name"),
47+
(_get_data_with_task_name("a.b", "a_name"), "a_name"),
48+
(_get_data_with_task_name("a", "a_name"), "a_name"),
49+
],
50+
)
51+
def test_task_name_is_provided(data: dict, expected_task_name: str) -> None:
52+
task_get = TaskGet(**data)
53+
assert task_get.task_name == expected_task_name
54+
55+
task_get = TypeAdapter(TaskGet).validate_python(data)
56+
assert task_get.task_name == expected_task_name

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_routes.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from pydantic import BaseModel
55
from servicelib.aiohttp import status
66

7-
from ...long_running_tasks import http_endpoint_responses
7+
from ...long_running_tasks import lrt_api
88
from ...long_running_tasks.models import TaskGet, TaskId, TaskStatus
99
from ..requests_validation import parse_request_path_parameters_as
1010
from ..rest_responses import create_data_response
@@ -24,12 +24,11 @@ async def list_tasks(request: web.Request) -> web.Response:
2424
[
2525
TaskGet(
2626
task_id=t.task_id,
27-
task_name=t.task_name,
2827
status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}",
2928
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
3029
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
3130
)
32-
for t in http_endpoint_responses.list_tasks(
31+
for t in lrt_api.list_tasks(
3332
long_running_manager.tasks_manager,
3433
long_running_manager.get_task_context(request),
3534
)
@@ -42,7 +41,7 @@ async def get_task_status(request: web.Request) -> web.Response:
4241
path_params = parse_request_path_parameters_as(_PathParam, request)
4342
long_running_manager = get_long_running_manager(request.app)
4443

45-
task_status: TaskStatus = http_endpoint_responses.get_task_status(
44+
task_status: TaskStatus = lrt_api.get_task_status(
4645
long_running_manager.tasks_manager,
4746
long_running_manager.get_task_context(request),
4847
path_params.task_id,
@@ -56,7 +55,7 @@ async def get_task_result(request: web.Request) -> web.Response | Any:
5655
long_running_manager = get_long_running_manager(request.app)
5756

5857
# NOTE: this might raise an exception that will be catched by the _error_handlers
59-
return await http_endpoint_responses.get_task_result(
58+
return await lrt_api.get_task_result(
6059
long_running_manager.tasks_manager,
6160
long_running_manager.get_task_context(request),
6261
path_params.task_id,
@@ -68,7 +67,7 @@ async def cancel_and_delete_task(request: web.Request) -> web.Response:
6867
path_params = parse_request_path_parameters_as(_PathParam, request)
6968
long_running_manager = get_long_running_manager(request.app)
7069

71-
await http_endpoint_responses.remove_task(
70+
await lrt_api.remove_task(
7271
long_running_manager.tasks_manager,
7372
long_running_manager.get_task_context(request),
7473
path_params.task_id,

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@
99
from pydantic import AnyHttpUrl, TypeAdapter
1010

1111
from ...aiohttp import status
12+
from ...long_running_tasks import lrt_api
1213
from ...long_running_tasks.constants import (
1314
DEFAULT_STALE_TASK_CHECK_INTERVAL,
1415
DEFAULT_STALE_TASK_DETECT_TIMEOUT,
1516
)
1617
from ...long_running_tasks.models import TaskGet
17-
from ...long_running_tasks.task import TaskContext, TaskProtocol, start_task
18+
from ...long_running_tasks.task import RegisteredTaskName, TaskContext
1819
from ..typing_extension import Handler
1920
from . import _routes
2021
from ._constants import (
@@ -25,11 +26,11 @@
2526
from ._manager import AiohttpLongRunningManager, get_long_running_manager
2627

2728

28-
def no_ops_decorator(handler: Handler):
29+
def _no_ops_decorator(handler: Handler):
2930
return handler
3031

3132

32-
def no_task_context_decorator(handler: Handler):
33+
def _no_task_context_decorator(handler: Handler):
3334
@wraps(handler)
3435
async def _wrap(request: web.Request):
3536
request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY] = {}
@@ -45,7 +46,7 @@ def _create_task_name_from_request(request: web.Request) -> str:
4546
async def start_long_running_task(
4647
# NOTE: positional argument are suffixed with "_" to avoid name conflicts with "task_kwargs" keys
4748
request_: web.Request,
48-
task_: TaskProtocol,
49+
registerd_task_name: RegisteredTaskName,
4950
*,
5051
fire_and_forget: bool = False,
5152
task_context: TaskContext,
@@ -55,9 +56,9 @@ async def start_long_running_task(
5556
task_name = _create_task_name_from_request(request_)
5657
task_id = None
5758
try:
58-
task_id = start_task(
59+
task_id = await lrt_api.start_task(
5960
long_running_manager.tasks_manager,
60-
task_,
61+
registerd_task_name,
6162
fire_and_forget=fire_and_forget,
6263
task_context=task_context,
6364
task_name=task_name,
@@ -78,7 +79,6 @@ async def start_long_running_task(
7879
)
7980
task_get = TaskGet(
8081
task_id=task_id,
81-
task_name=task_name,
8282
status_href=f"{status_url}",
8383
result_href=f"{result_url}",
8484
abort_href=f"{abort_url}",
@@ -121,8 +121,8 @@ def setup(
121121
app: web.Application,
122122
*,
123123
router_prefix: str,
124-
handler_check_decorator: Callable = no_ops_decorator,
125-
task_request_context_decorator: Callable = no_task_context_decorator,
124+
handler_check_decorator: Callable = _no_ops_decorator,
125+
task_request_context_decorator: Callable = _no_task_context_decorator,
126126
stale_task_check_interval: datetime.timedelta = DEFAULT_STALE_TASK_CHECK_INTERVAL,
127127
stale_task_detect_timeout: datetime.timedelta = DEFAULT_STALE_TASK_DETECT_TIMEOUT,
128128
) -> None:

packages/service-library/src/servicelib/fastapi/long_running_tasks/_routes.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from fastapi import APIRouter, Depends, Request, status
44

5-
from ...long_running_tasks import http_endpoint_responses
5+
from ...long_running_tasks import lrt_api
66
from ...long_running_tasks.models import TaskGet, TaskId, TaskResult, TaskStatus
77
from ..requests_decorators import cancel_on_disconnect
88
from ._dependencies import get_long_running_manager
@@ -23,14 +23,13 @@ async def list_tasks(
2323
return [
2424
TaskGet(
2525
task_id=t.task_id,
26-
task_name=t.task_name,
2726
status_href=str(request.url_for("get_task_status", task_id=t.task_id)),
2827
result_href=str(request.url_for("get_task_result", task_id=t.task_id)),
2928
abort_href=str(
3029
request.url_for("cancel_and_delete_task", task_id=t.task_id)
3130
),
3231
)
33-
for t in http_endpoint_responses.list_tasks(
32+
for t in lrt_api.list_tasks(
3433
long_running_manager.tasks_manager, task_context=None
3534
)
3635
]
@@ -52,7 +51,7 @@ async def get_task_status(
5251
],
5352
) -> TaskStatus:
5453
assert request # nosec
55-
return http_endpoint_responses.get_task_status(
54+
return lrt_api.get_task_status(
5655
long_running_manager.tasks_manager, task_context=None, task_id=task_id
5756
)
5857

@@ -75,7 +74,7 @@ async def get_task_result(
7574
],
7675
) -> TaskResult | Any:
7776
assert request # nosec
78-
return await http_endpoint_responses.get_task_result(
77+
return await lrt_api.get_task_result(
7978
long_running_manager.tasks_manager, task_context=None, task_id=task_id
8079
)
8180

@@ -98,6 +97,6 @@ async def cancel_and_delete_task(
9897
],
9998
) -> None:
10099
assert request # nosec
101-
await http_endpoint_responses.remove_task(
100+
await lrt_api.remove_task(
102101
long_running_manager.tasks_manager, task_context=None, task_id=task_id
103102
)

packages/service-library/src/servicelib/logging_errors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def create_troubleshootting_log_kwargs(
7474
...
7575
except MyException as exc
7676
_logger.exception(
77-
**create_troubleshotting_log_kwargs(
77+
**create_troubleshootting_log_kwargs(
7878
user_error_msg=frontend_msg,
7979
error=exc,
8080
error_context={

packages/service-library/src/servicelib/long_running_tasks/errors.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@ class BaseLongRunningError(OsparcErrorMixin, Exception):
55
"""base exception for this module"""
66

77

8+
class TaskNotRegisteredError(BaseLongRunningError):
9+
msg_template: str = (
10+
"no task with task_name='{task_name}' was found in the task registry. "
11+
"Make sure it's registered before starting it."
12+
)
13+
14+
815
class TaskAlreadyRunningError(BaseLongRunningError):
916
msg_template: str = "{task_name} must be unique, found: '{managed_task}'"
1017

packages/service-library/src/servicelib/long_running_tasks/http_endpoint_responses.py

Lines changed: 0 additions & 63 deletions
This file was deleted.

0 commit comments

Comments
 (0)