Skip to content

Commit f160ddb

Browse files
author
Andrei Neagu
committed
removal does not wait for task to be removed and supports a timeout
1 parent 3f76411 commit f160ddb

File tree

8 files changed

+64
-11
lines changed

8 files changed

+64
-11
lines changed

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_routes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,6 @@ async def remove_task(request: web.Request) -> web.Response:
7575
long_running_manager,
7676
long_running_manager.get_task_context(request),
7777
path_params.task_id,
78+
wait_for_removal=False, # frontend does not care about waiting for this
7879
)
7980
return web.json_response(status=status.HTTP_204_NO_CONTENT)

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ async def start_long_running_task(
103103
long_running_manager,
104104
task_context,
105105
task_id,
106+
wait_for_removal=True,
106107
)
107108
raise
108109

packages/service-library/src/servicelib/fastapi/long_running_tasks/_routes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,5 @@ async def remove_task(
106106
long_running_manager,
107107
task_context={},
108108
task_id=task_id,
109+
wait_for_removal=True, # only used by internal services, they will wait as before
109110
)

packages/service-library/src/servicelib/long_running_tasks/_lrt_client.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
_logger = logging.getLogger(__name__)
2424

25-
_RPC_TIMEOUT_TASK_REMOVAL: Final[PositiveInt] = int(
25+
_RPC_MAX_CANCELLATION_TIMEOUT: Final[PositiveInt] = int(
2626
timedelta(minutes=60).total_seconds()
2727
)
2828
_RPC_TIMEOUT_SHORT_REQUESTS: Final[PositiveInt] = int(
@@ -134,14 +134,28 @@ async def remove_task(
134134
*,
135135
task_context: TaskContext,
136136
task_id: TaskId,
137-
reraise_errors: bool = True,
137+
wait_for_removal: bool,
138+
reraise_errors: bool,
139+
cancellation_timeout: timedelta | None = None,
138140
) -> None:
141+
timeout_s = (
142+
_RPC_MAX_CANCELLATION_TIMEOUT
143+
if cancellation_timeout is None
144+
else int(cancellation_timeout.total_seconds())
145+
)
146+
147+
# NOTE: task always gets cancelled even if not waiting for it
148+
# request will return immediatlye, no need to wait so much
149+
if not wait_for_removal:
150+
timeout_s = _RPC_TIMEOUT_SHORT_REQUESTS
151+
139152
result = await rabbitmq_rpc_client.request(
140153
get_rabbit_namespace(namespace),
141154
TypeAdapter(RPCMethodName).validate_python("remove_task"),
142155
task_context=task_context,
143156
task_id=task_id,
157+
wait_for_removal=wait_for_removal,
144158
reraise_errors=reraise_errors,
145-
timeout_s=_RPC_TIMEOUT_TASK_REMOVAL,
159+
timeout_s=timeout_s,
146160
)
147161
assert result is None # nosec

packages/service-library/src/servicelib/long_running_tasks/_lrt_server.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,12 @@ async def remove_task(
106106
*,
107107
task_context: TaskContext,
108108
task_id: TaskId,
109+
wait_for_removal: bool,
109110
reraise_errors: bool,
110111
) -> None:
111112
await long_running_manager.tasks_manager.remove_task(
112-
task_id, with_task_context=task_context, reraise_errors=reraise_errors
113+
task_id,
114+
with_task_context=task_context,
115+
wait_for_removal=wait_for_removal,
116+
reraise_errors=reraise_errors,
113117
)

packages/service-library/src/servicelib/long_running_tasks/lrt_api.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,15 @@ async def remove_task(
101101
long_running_manager: BaseLongRunningManager,
102102
task_context: TaskContext,
103103
task_id: TaskId,
104+
*,
105+
wait_for_removal: bool,
104106
) -> None:
105107
"""cancels and removes the task"""
106108
await _lrt_client.remove_task(
107109
rabbitmq_rpc_client,
108110
long_running_manager.lrt_namespace,
109111
task_id=task_id,
110112
task_context=task_context,
113+
wait_for_removal=wait_for_removal,
114+
reraise_errors=True,
111115
)

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ async def teardown(self) -> None:
189189
tracked_task.task_id,
190190
tracked_task.task_context,
191191
# when closing we do not care about pending errors
192+
wait_for_removal=True,
192193
reraise_errors=False,
193194
)
194195

@@ -260,7 +261,10 @@ async def _stale_tasks_monitor(self) -> None:
260261
).model_dump_json(),
261262
)
262263
await self.remove_task(
263-
task_id, with_task_context=task_context, reraise_errors=False
264+
task_id,
265+
with_task_context=task_context,
266+
wait_for_removal=True,
267+
reraise_errors=False,
264268
)
265269

266270
async def _cancelled_tasks_removal(self) -> None:
@@ -400,6 +404,7 @@ async def remove_task(
400404
task_id: TaskId,
401405
with_task_context: TaskContext,
402406
*,
407+
wait_for_removal: bool,
403408
reraise_errors: bool = True,
404409
) -> None:
405410
"""cancels and removes task"""
@@ -414,6 +419,9 @@ async def remove_task(
414419
tracked_task.task_id, tracked_task.task_context
415420
)
416421

422+
if not wait_for_removal:
423+
return
424+
417425
# wait for task to be removed since it might not have been running
418426
# in this process
419427
async for attempt in AsyncRetrying(

packages/service-library/tests/aiohttp/long_running_tasks/test_long_running_tasks_with_task_context.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929
from servicelib.long_running_tasks.task import TaskContext
3030
from settings_library.rabbit import RabbitSettings
3131
from settings_library.redis import RedisSettings
32+
from tenacity.asyncio import AsyncRetrying
33+
from tenacity.retry import retry_if_exception_type
34+
from tenacity.stop import stop_after_delay
35+
from tenacity.wait import wait_fixed
3236

3337
pytest_simcore_core_services_selection = [
3438
"rabbit",
@@ -176,15 +180,31 @@ async def test_cancel_task(
176180
task_id=task_id
177181
)
178182
# calling cancel without task context should find nothing
179-
resp = await client_with_task_context.delete(f"{cancel_url}")
180-
await assert_status(resp, status.HTTP_404_NOT_FOUND)
183+
# no longer waits for removal to end
184+
async for attempt in AsyncRetrying(
185+
wait=wait_fixed(0.1),
186+
stop=stop_after_delay(5),
187+
reraise=True,
188+
retry=retry_if_exception_type(AssertionError),
189+
):
190+
with attempt:
191+
resp = await client_with_task_context.delete(f"{cancel_url}")
192+
await assert_status(resp, status.HTTP_404_NOT_FOUND)
181193
# calling with context should find and delete the task
182194
resp = await client_with_task_context.delete(
183195
f"{cancel_url.update_query(task_context)}"
184196
)
185197
await assert_status(resp, status.HTTP_204_NO_CONTENT)
186198
# calling with context a second time should find nothing
187-
resp = await client_with_task_context.delete(
188-
f"{cancel_url.update_query(task_context)}"
189-
)
190-
await assert_status(resp, status.HTTP_404_NOT_FOUND)
199+
# no longer waits for removal to end
200+
async for attempt in AsyncRetrying(
201+
wait=wait_fixed(0.1),
202+
stop=stop_after_delay(5),
203+
reraise=True,
204+
retry=retry_if_exception_type(AssertionError),
205+
):
206+
with attempt:
207+
resp = await client_with_task_context.delete(
208+
f"{cancel_url.update_query(task_context)}"
209+
)
210+
await assert_status(resp, status.HTTP_404_NOT_FOUND)

0 commit comments

Comments
 (0)