Skip to content

Commit 407fa83

Browse files
author
Andrei Neagu
committed
fixed interfaces
1 parent 4bf6d63 commit 407fa83

File tree

7 files changed

+61
-51
lines changed

7 files changed

+61
-51
lines changed

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_routes.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
from typing import Any
1+
from typing import Annotated, Any
22

33
from aiohttp import web
4-
from pydantic import BaseModel
4+
from models_library.rest_base import RequestParameters
5+
from pydantic import BaseModel, Field
56

67
from ...aiohttp import status
78
from ...long_running_tasks import lrt_api
89
from ...long_running_tasks.models import TaskGet, TaskId
9-
from ..requests_validation import parse_request_path_parameters_as
10+
from ..requests_validation import (
11+
parse_request_path_parameters_as,
12+
parse_request_query_parameters_as,
13+
)
1014
from ..rest_responses import create_data_response
1115
from ._manager import get_long_running_manager
1216

@@ -65,16 +69,29 @@ async def get_task_result(request: web.Request) -> web.Response | Any:
6569
)
6670

6771

72+
class MyRequestQueryParams(RequestParameters):
73+
wait_for_removal: Annotated[
74+
bool,
75+
Field(
76+
description=(
77+
"when True waits for the task to be removed "
78+
"completly instead of returning immediately"
79+
)
80+
),
81+
] = True
82+
83+
6884
@routes.delete("/{task_id}", name="remove_task")
6985
async def remove_task(request: web.Request) -> web.Response:
7086
path_params = parse_request_path_parameters_as(_PathParam, request)
87+
query_params = parse_request_query_parameters_as(MyRequestQueryParams, request)
7188
long_running_manager = get_long_running_manager(request.app)
7289

7390
await lrt_api.remove_task(
7491
long_running_manager.rpc_client,
7592
long_running_manager.lrt_namespace,
7693
long_running_manager.get_task_context(request),
7794
path_params.task_id,
78-
wait_for_removal=False, # frontend does not care about waiting for this
95+
wait_for_removal=query_params.wait_for_removal,
7996
)
8097
return web.json_response(status=status.HTTP_204_NO_CONTENT)

packages/service-library/src/servicelib/fastapi/long_running_tasks/_routes.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import Annotated, Any
22

3-
from fastapi import APIRouter, Depends, Request, status
3+
from fastapi import APIRouter, Depends, Query, Request, status
44

55
from ...long_running_tasks import lrt_api
66
from ...long_running_tasks.models import TaskGet, TaskId, TaskResult, TaskStatus
@@ -30,7 +30,7 @@ async def list_tasks(
3030
for t in await lrt_api.list_tasks(
3131
long_running_manager.rpc_client,
3232
long_running_manager.lrt_namespace,
33-
task_context={},
33+
long_running_manager.get_task_context(request),
3434
)
3535
]
3636

@@ -45,16 +45,16 @@ async def list_tasks(
4545
@cancel_on_disconnect
4646
async def get_task_status(
4747
request: Request,
48-
task_id: TaskId,
4948
long_running_manager: Annotated[
5049
FastAPILongRunningManager, Depends(get_long_running_manager)
5150
],
51+
task_id: TaskId,
5252
) -> TaskStatus:
5353
assert request # nosec
5454
return await lrt_api.get_task_status(
5555
long_running_manager.rpc_client,
5656
long_running_manager.lrt_namespace,
57-
task_context={},
57+
long_running_manager.get_task_context(request),
5858
task_id=task_id,
5959
)
6060

@@ -71,16 +71,16 @@ async def get_task_status(
7171
@cancel_on_disconnect
7272
async def get_task_result(
7373
request: Request,
74-
task_id: TaskId,
7574
long_running_manager: Annotated[
7675
FastAPILongRunningManager, Depends(get_long_running_manager)
7776
],
77+
task_id: TaskId,
7878
) -> TaskResult | Any:
7979
assert request # nosec
8080
return await lrt_api.get_task_result(
8181
long_running_manager.rpc_client,
8282
long_running_manager.lrt_namespace,
83-
task_context={},
83+
long_running_manager.get_task_context(request),
8484
task_id=task_id,
8585
)
8686

@@ -97,16 +97,26 @@ async def get_task_result(
9797
@cancel_on_disconnect
9898
async def remove_task(
9999
request: Request,
100-
task_id: TaskId,
101100
long_running_manager: Annotated[
102101
FastAPILongRunningManager, Depends(get_long_running_manager)
103102
],
103+
task_id: TaskId,
104+
*,
105+
wait_for_removal: Annotated[
106+
bool,
107+
Query(
108+
description=(
109+
"when True waits for the task to be removed "
110+
"completly instead of returning immediately"
111+
),
112+
),
113+
] = True,
104114
) -> None:
105115
assert request # nosec
106116
await lrt_api.remove_task(
107117
long_running_manager.rpc_client,
108118
long_running_manager.lrt_namespace,
109-
task_context={},
119+
long_running_manager.get_task_context(request),
110120
task_id=task_id,
111-
wait_for_removal=True, # only used by internal services, they will wait as before
121+
wait_for_removal=wait_for_removal,
112122
)

packages/service-library/src/servicelib/long_running_tasks/_lrt_client.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ async def remove_task(
137137
*,
138138
task_context: TaskContext,
139139
task_id: TaskId,
140-
reraise_errors: bool,
141140
wait_for_removal: bool,
142141
cancellation_timeout: timedelta | None,
143142
) -> None:
@@ -158,7 +157,6 @@ async def remove_task(
158157
task_context=task_context,
159158
task_id=task_id,
160159
wait_for_removal=wait_for_removal,
161-
reraise_errors=reraise_errors,
162160
timeout_s=timeout_s,
163161
)
164162
assert result is None # nosec

packages/service-library/src/servicelib/long_running_tasks/_lrt_server.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import logging
2+
from contextlib import suppress
23
from typing import TYPE_CHECKING, Any
34

45
from ..logging_errors import create_troubleshootting_log_kwargs
56
from ..rabbitmq import RPCRouter
67
from ._serialization import string_to_object
7-
from .errors import BaseLongRunningError
8+
from .errors import BaseLongRunningError, TaskNotFoundError
89
from .models import (
910
ErrorResponse,
1011
RegisteredTaskName,
@@ -110,12 +111,12 @@ async def get_task_result(
110111
raise ValueError(msg)
111112
finally:
112113
# Ensure the task is removed regardless of the result
113-
await long_running_manager.tasks_manager.remove_task(
114-
task_id,
115-
with_task_context=task_context,
116-
wait_for_removal=True,
117-
reraise_errors=False,
118-
)
114+
with suppress(TaskNotFoundError):
115+
await long_running_manager.tasks_manager.remove_task(
116+
task_id,
117+
with_task_context=task_context,
118+
wait_for_removal=True,
119+
)
119120

120121

121122
@router.expose(reraise_if_error_type=(BaseLongRunningError,))
@@ -125,11 +126,9 @@ async def remove_task(
125126
task_context: TaskContext,
126127
task_id: TaskId,
127128
wait_for_removal: bool,
128-
reraise_errors: bool,
129129
) -> None:
130130
await long_running_manager.tasks_manager.remove_task(
131131
task_id,
132132
with_task_context=task_context,
133133
wait_for_removal=wait_for_removal,
134-
reraise_errors=reraise_errors,
135134
)

packages/service-library/src/servicelib/long_running_tasks/lrt_api.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ async def remove_task(
125125
lrt_namespace,
126126
task_id=task_id,
127127
task_context=task_context,
128-
reraise_errors=True,
129128
wait_for_removal=wait_for_removal,
130129
cancellation_timeout=cancellation_timeout,
131130
)

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -202,13 +202,13 @@ async def setup(self) -> None:
202202
async def teardown(self) -> None:
203203
# ensure all created tasks are cancelled
204204
for tracked_task in await self._tasks_data.list_tasks_data():
205-
await self.remove_task(
206-
tracked_task.task_id,
207-
tracked_task.task_context,
208-
# when closing we do not care about pending errors
209-
wait_for_removal=True,
210-
reraise_errors=False,
211-
)
205+
# when closing we do not care about pending errors
206+
with suppress(TaskNotFoundError):
207+
await self.remove_task(
208+
tracked_task.task_id,
209+
tracked_task.task_context,
210+
wait_for_removal=True,
211+
)
212212

213213
for task in self._created_tasks.values():
214214
_logger.warning(
@@ -278,10 +278,7 @@ async def _stale_tasks_monitor(self) -> None:
278278
).model_dump_json(),
279279
)
280280
await self.remove_task(
281-
task_id,
282-
with_task_context=task_context,
283-
wait_for_removal=True,
284-
reraise_errors=False,
281+
task_id, with_task_context=task_context, wait_for_removal=True
285282
)
286283

287284
async def _cancelled_tasks_removal(self) -> None:
@@ -438,15 +435,12 @@ async def remove_task(
438435
with_task_context: TaskContext,
439436
*,
440437
wait_for_removal: bool,
441-
reraise_errors: bool = True,
442438
) -> None:
443-
"""cancels and removes task"""
444-
try:
445-
tracked_task = await self._get_tracked_task(task_id, with_task_context)
446-
except TaskNotFoundError:
447-
if reraise_errors:
448-
raise
449-
return
439+
"""
440+
cancels and removes task
441+
raises TaskNotFoundError if the task cannot be found
442+
"""
443+
tracked_task = await self._get_tracked_task(task_id, with_task_context)
450444

451445
await self._tasks_data.mark_task_for_removal(
452446
tracked_task.task_id, tracked_task.task_context

packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -498,13 +498,6 @@ async def test_remove_unknown_task(
498498
"invalid_id", with_task_context=empty_context, wait_for_removal=True
499499
)
500500

501-
await long_running_manager.tasks_manager.remove_task(
502-
"invalid_id",
503-
with_task_context=empty_context,
504-
wait_for_removal=True,
505-
reraise_errors=False,
506-
)
507-
508501

509502
async def test__cancelled_tasks_worker_equivalent_of_cancellation_from_a_different_process(
510503
long_running_manager: BaseLongRunningManager, empty_context: TaskContext

0 commit comments

Comments
 (0)