Skip to content

Commit 2350f9a

Browse files
GitHKAndrei NeaguCopilotmergify[bot]
authored
♻️ Unifying long_running_tasks interfaces (#7697)
Co-authored-by: Andrei Neagu <[email protected]> Co-authored-by: Copilot <[email protected]> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent c2cf892 commit 2350f9a

File tree

15 files changed

+52
-167
lines changed

15 files changed

+52
-167
lines changed

packages/service-library/src/servicelib/fastapi/long_running_tasks/_client.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
import functools
33
import logging
44
import warnings
5-
from typing import Any, Awaitable, Callable, Final
5+
from collections.abc import Awaitable, Callable
6+
from typing import Any, Final
67

78
from fastapi import FastAPI, status
89
from httpx import AsyncClient, HTTPError
@@ -13,13 +14,8 @@
1314
from tenacity.stop import stop_after_attempt
1415
from tenacity.wait import wait_exponential
1516

16-
from ...long_running_tasks._errors import GenericClientError, TaskClientResultError
17-
from ...long_running_tasks._models import (
18-
ClientConfiguration,
19-
TaskId,
20-
TaskResult,
21-
TaskStatus,
22-
)
17+
from ...long_running_tasks._errors import GenericClientError
18+
from ...long_running_tasks._models import ClientConfiguration, TaskId, TaskStatus
2319

2420
DEFAULT_HTTP_REQUESTS_TIMEOUT: Final[PositiveFloat] = 15
2521

@@ -85,7 +81,7 @@ def log_it(retry_state: RetryCallState) -> None:
8581

8682

8783
def retry_on_http_errors(
88-
request_func: Callable[..., Awaitable[Any]]
84+
request_func: Callable[..., Awaitable[Any]],
8985
) -> Callable[..., Awaitable[Any]]:
9086
"""
9187
Will retry the request on `httpx.HTTPError`.
@@ -173,10 +169,7 @@ async def get_task_result(
173169
body=result.text,
174170
)
175171

176-
task_result = TaskResult.model_validate(result.json())
177-
if task_result.error is not None:
178-
raise TaskClientResultError(message=task_result.error)
179-
return task_result.result
172+
return result.json()
180173

181174
@retry_on_http_errors
182175
async def cancel_and_delete_task(

packages/service-library/src/servicelib/fastapi/long_running_tasks/_context_manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import asyncio
22
from asyncio.log import logger
3+
from collections.abc import AsyncIterator
34
from contextlib import asynccontextmanager
4-
from typing import Any, AsyncIterator, Final
5+
from typing import Any, Final
56

67
from pydantic import PositiveFloat
78

@@ -87,8 +88,7 @@ async def periodic_task_result(
8788
- `status_poll_interval` optional: when waiting for a task to finish,
8889
how frequent should the server be queried
8990
90-
raises: `TaskClientResultError` if the task finished with an error instead of
91-
the expected result
91+
raises: the original expcetion the task raised, if any
9292
raises: `asyncio.TimeoutError` NOTE: the remote task will also be removed
9393
"""
9494

packages/service-library/src/servicelib/fastapi/long_running_tasks/_routes.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import Annotated, Any
22

3-
from fastapi import APIRouter, Depends, Query, Request, status
3+
from fastapi import APIRouter, Depends, Request, status
44

55
from ...long_running_tasks._errors import TaskNotCompletedError, TaskNotFoundError
66
from ...long_running_tasks._models import TaskGet, TaskId, TaskResult, TaskStatus
@@ -60,16 +60,10 @@ async def get_task_result(
6060
request: Request,
6161
task_id: TaskId,
6262
tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)],
63-
*,
64-
return_exception: Annotated[bool, Query()] = False,
6563
) -> TaskResult | Any:
6664
assert request # nosec
67-
# TODO: refactor this to use same as in https://github.com/ITISFoundation/osparc-simcore/issues/3265
6865
try:
69-
if return_exception:
70-
task_result = tasks_manager.get_task_result(task_id, with_task_context=None)
71-
else:
72-
task_result = tasks_manager.get_task_result_old(task_id=task_id)
66+
task_result = tasks_manager.get_task_result(task_id, with_task_context=None)
7367
await tasks_manager.remove_task(
7468
task_id, with_task_context=None, reraise_errors=False
7569
)

packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@
1010
import httpx
1111
from fastapi import status
1212
from models_library.api_schemas_long_running_tasks.base import TaskProgress
13-
from models_library.api_schemas_long_running_tasks.tasks import TaskGet, TaskStatus
13+
from models_library.api_schemas_long_running_tasks.tasks import (
14+
TaskGet,
15+
TaskResult,
16+
TaskStatus,
17+
)
1418
from tenacity import (
1519
AsyncRetrying,
1620
TryAgain,
@@ -23,7 +27,6 @@
2327
from yarl import URL
2428

2529
from ...long_running_tasks._constants import DEFAULT_POLL_INTERVAL_S, HOUR
26-
from ...long_running_tasks._errors import TaskClientResultError
2730
from ...long_running_tasks._models import (
2831
ClientConfiguration,
2932
LRTask,
@@ -32,7 +35,7 @@
3235
ProgressPercent,
3336
RequestBody,
3437
)
35-
from ...long_running_tasks._task import TaskId, TaskResult
38+
from ...long_running_tasks._task import TaskId
3639
from ...rest_responses import unwrap_envelope_if_required
3740
from ._client import DEFAULT_HTTP_REQUESTS_TIMEOUT, Client, setup
3841
from ._context_manager import periodic_task_result
@@ -97,7 +100,7 @@ async def _wait_for_completion(
97100

98101
@retry(**_DEFAULT_FASTAPI_RETRY_POLICY)
99102
async def _task_result(session: httpx.AsyncClient, result_url: URL) -> Any:
100-
response = await session.get(f"{result_url}", params={"return_exception": True})
103+
response = await session.get(f"{result_url}")
101104
response.raise_for_status()
102105
if response.status_code != status.HTTP_204_NO_CONTENT:
103106
return unwrap_envelope_if_required(response.json())
@@ -155,7 +158,6 @@ async def long_running_task_request(
155158
"ProgressCallback",
156159
"ProgressMessage",
157160
"ProgressPercent",
158-
"TaskClientResultError",
159161
"TaskId",
160162
"TaskResult",
161163
"periodic_task_result",

packages/service-library/src/servicelib/fastapi/long_running_tasks/server.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
running task. The client will take care of recovering the result from it.
77
"""
88

9+
from models_library.api_schemas_long_running_tasks.tasks import TaskResult
10+
911
from ...long_running_tasks._errors import TaskAlreadyRunningError, TaskCancelledError
1012
from ...long_running_tasks._task import (
1113
TaskId,
1214
TaskProgress,
13-
TaskResult,
1415
TasksManager,
1516
TaskStatus,
1617
start_task,

packages/service-library/src/servicelib/long_running_tasks/_errors.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,50 +4,36 @@
44
class BaseLongRunningError(OsparcErrorMixin, Exception):
55
"""base exception for this module"""
66

7-
code: str = "long_running_task.base_long_running_error" # type: ignore[assignment]
8-
97

108
class TaskAlreadyRunningError(BaseLongRunningError):
11-
code: str = "long_running_task.task_already_running"
129
msg_template: str = "{task_name} must be unique, found: '{managed_task}'"
1310

1411

1512
class TaskNotFoundError(BaseLongRunningError):
16-
code: str = "long_running_task.task_not_found"
1713
msg_template: str = "No task with {task_id} found"
1814

1915

2016
class TaskNotCompletedError(BaseLongRunningError):
21-
code: str = "long_running_task.task_not_completed"
2217
msg_template: str = "Task {task_id} has not finished yet"
2318

2419

2520
class TaskCancelledError(BaseLongRunningError):
26-
code: str = "long_running_task.task_cancelled_error"
2721
msg_template: str = "Task {task_id} was cancelled before completing"
2822

2923

3024
class TaskExceptionError(BaseLongRunningError):
31-
code: str = "long_running_task.task_exception_error"
3225
msg_template: str = (
3326
"Task {task_id} finished with exception: '{exception}'\n{traceback}"
3427
)
3528

3629

3730
class TaskClientTimeoutError(BaseLongRunningError):
38-
code: str = "long_running_task.client.timed_out_waiting_for_response"
3931
msg_template: str = (
4032
"Timed out after {timeout} seconds while awaiting '{task_id}' to complete"
4133
)
4234

4335

4436
class GenericClientError(BaseLongRunningError):
45-
code: str = "long_running_task.client.generic_error"
4637
msg_template: str = (
4738
"Unexpected error while '{action}' for '{task_id}': status={status} body={body}"
4839
)
49-
50-
51-
class TaskClientResultError(BaseLongRunningError):
52-
code: str = "long_running_task.client.task_raised_error"
53-
msg_template: str = "{message}"

packages/service-library/src/servicelib/long_running_tasks/_task.py

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
TaskNotCompletedError,
2323
TaskNotFoundError,
2424
)
25-
from ._models import TaskId, TaskName, TaskResult, TaskStatus, TrackedTask
25+
from ._models import TaskId, TaskName, TaskStatus, TrackedTask
2626

2727
logger = logging.getLogger(__name__)
2828

@@ -241,36 +241,6 @@ def get_task_result(
241241
# the task was cancelled
242242
raise TaskCancelledError(task_id=task_id) from exc
243243

244-
def get_task_result_old(self, task_id: TaskId) -> TaskResult:
245-
"""
246-
returns: the result of the task
247-
248-
raises TaskNotFoundError if the task cannot be found
249-
"""
250-
tracked_task = self._get_tracked_task(task_id, {})
251-
252-
if not tracked_task.task.done():
253-
raise TaskNotCompletedError(task_id=task_id)
254-
255-
error: TaskExceptionError | TaskCancelledError
256-
try:
257-
exception = tracked_task.task.exception()
258-
if exception is not None:
259-
formatted_traceback = "\n".join(
260-
traceback.format_tb(exception.__traceback__)
261-
)
262-
error = TaskExceptionError(
263-
task_id=task_id, exception=exception, traceback=formatted_traceback
264-
)
265-
logger.warning("Task %s finished with error: %s", task_id, f"{error}")
266-
return TaskResult(result=None, error=f"{error}")
267-
except asyncio.CancelledError:
268-
error = TaskCancelledError(task_id=task_id)
269-
logger.warning("Task %s was cancelled", task_id)
270-
return TaskResult(result=None, error=f"{error}")
271-
272-
return TaskResult(result=tracked_task.task.result(), error=None)
273-
274244
async def cancel_task(
275245
self, task_id: TaskId, with_task_context: TaskContext | None
276246
) -> None:
@@ -354,12 +324,12 @@ async def close(self) -> None:
354324

355325

356326
class TaskProtocol(Protocol):
357-
async def __call__(self, progress: TaskProgress, *args: Any, **kwargs: Any) -> Any:
358-
...
327+
async def __call__(
328+
self, progress: TaskProgress, *args: Any, **kwargs: Any
329+
) -> Any: ...
359330

360331
@property
361-
def __name__(self) -> str:
362-
...
332+
def __name__(self) -> str: ...
363333

364334

365335
def start_task(
@@ -449,7 +419,5 @@ async def _progress_task(progress: TaskProgress, handler: TaskProtocol):
449419
"TaskProgress",
450420
"TaskProtocol",
451421
"TaskStatus",
452-
"TaskResult",
453422
"TrackedTask",
454-
"TaskResult",
455423
)

packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ async def _caller(app: FastAPI, client: AsyncClient, **query_kwargs) -> TaskId:
103103

104104

105105
@pytest.fixture
106-
def wait_for_task() -> Callable[
107-
[FastAPI, AsyncClient, TaskId, TaskContext], Awaitable[None]
108-
]:
106+
def wait_for_task() -> (
107+
Callable[[FastAPI, AsyncClient, TaskId, TaskContext], Awaitable[None]]
108+
):
109109
async def _waiter(
110110
app: FastAPI,
111111
client: AsyncClient,
@@ -183,9 +183,7 @@ async def test_workflow(
183183
result = await client.get(f"{result_url}")
184184
# NOTE: this is DIFFERENT than with aiohttp where we return the real result
185185
assert result.status_code == status.HTTP_200_OK
186-
task_result = long_running_tasks.server.TaskResult.model_validate(result.json())
187-
assert not task_result.error
188-
assert task_result.result == [f"{x}" for x in range(10)]
186+
assert result.json() == [f"{x}" for x in range(10)]
189187
# getting the result again should raise a 404
190188
result = await client.get(result_url)
191189
assert result.status_code == status.HTTP_404_NOT_FOUND
@@ -220,19 +218,9 @@ async def test_failing_task_returns_error(
220218
await wait_for_task(app, client, task_id, {})
221219
# get the result
222220
result_url = app.url_path_for("get_task_result", task_id=task_id)
223-
result = await client.get(f"{result_url}")
224-
assert result.status_code == status.HTTP_200_OK
225-
task_result = long_running_tasks.server.TaskResult.model_validate(result.json())
226-
227-
assert not task_result.result
228-
assert task_result.error
229-
assert task_result.error.startswith(f"Task {task_id} finished with exception: ")
230-
assert 'raise RuntimeError("We were asked to fail!!")' in task_result.error
231-
# NOTE: this is not yet happening with fastapi version of long running task
232-
# assert "errors" in task_result.error
233-
# assert len(task_result.error["errors"]) == 1
234-
# assert task_result.error["errors"][0]["code"] == "RuntimeError"
235-
# assert task_result.error["errors"][0]["message"] == "We were asked to fail!!"
221+
with pytest.raises(RuntimeError) as exec_info:
222+
await client.get(f"{result_url}")
223+
assert f"{exec_info.value}" == "We were asked to fail!!"
236224

237225

238226
async def test_get_results_before_tasks_finishes_returns_404(

packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks_context_manager.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
# pylint: disable=unused-argument
33

44
import asyncio
5-
from typing import AsyncIterable, Final
5+
from collections.abc import AsyncIterable
6+
from typing import Final
67

78
import pytest
89
from asgi_lifespan import LifespanManager
@@ -24,9 +25,10 @@
2425
get_tasks_manager,
2526
)
2627
from servicelib.fastapi.long_running_tasks.server import setup as setup_server
27-
from servicelib.fastapi.long_running_tasks.server import start_task
28+
from servicelib.fastapi.long_running_tasks.server import (
29+
start_task,
30+
)
2831
from servicelib.long_running_tasks._errors import (
29-
TaskClientResultError,
3032
TaskClientTimeoutError,
3133
)
3234

@@ -149,16 +151,14 @@ async def test_task_result_task_result_is_an_error(
149151

150152
url = TypeAdapter(AnyHttpUrl).validate_python("http://backgroud.testserver.io/")
151153
client = Client(app=bg_task_app, async_client=async_client, base_url=url)
152-
with pytest.raises(TaskClientResultError) as exec_info:
154+
with pytest.raises(RuntimeError, match="I am failing as requested"):
153155
async with periodic_task_result(
154156
client,
155157
task_id,
156158
task_timeout=10,
157159
status_poll_interval=TASK_SLEEP_INTERVAL / 3,
158160
):
159161
pass
160-
assert f"{exec_info.value}".startswith(f"Task {task_id} finished with exception:")
161-
assert "I am failing as requested" in f"{exec_info.value}"
162162
await _assert_task_removed(async_client, task_id, router_prefix)
163163

164164

0 commit comments

Comments
 (0)