Skip to content

Commit 1e4d300

Browse files
author
Andrei Neagu
committed
added cancellation test
1 parent c41e2a0 commit 1e4d300

File tree

4 files changed

+41
-6
lines changed

4 files changed

+41
-6
lines changed

packages/service-library/src/servicelib/long_running_interfaces/_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,5 +296,8 @@ async def ensure_result(
296296
# when completed remove the task form memory both on the server and the client
297297
# NOTE: unsure if these should be caught and ignored. In the case we decide to
298298
# catch them there should be some automatic cleanup in case they are forgotten
299+
300+
# remove from remote executor
299301
await self._rpc_interface.remove(unique_id)
302+
# remove distributed storage (redis)
300303
await self._store_interface.remove(unique_id)

packages/service-library/src/servicelib/long_running_interfaces/_server.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ def __init__(
4040
long_running_namespace: LongRunningNamespace,
4141
job_interface: BaseServerJobInterface,
4242
) -> None:
43-
self._rpc_interface = ServerRPCInterface(
43+
self.rpc_interface = ServerRPCInterface(
4444
rabbit_settings, long_running_namespace, job_interface
4545
)
4646

4747
# TODO: register jobs! and then use the interface to run them!!!
4848

4949
async def setup(self) -> None:
50-
await self._rpc_interface.setup()
50+
await self.rpc_interface.setup()
5151

5252
async def teardown(self) -> None:
53-
await self._rpc_interface.teardown()
53+
await self.rpc_interface.teardown()

packages/service-library/src/servicelib/long_running_interfaces/runners/asyncio_tasks/_core.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,3 @@ async def get_result(self, unique_id: JobUniqueId) -> Any | None:
7070

7171
task = self._tasks[unique_id]
7272
return task.result()
73-
74-
75-
# TODO: add tests individually for these so we knoe they work when registring starting and stopping and all the operations. Also cancellation

packages/service-library/tests/long_running_interfaces/test_workflow.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import pytest
1111
from pydantic import NonNegativeInt, ValidationError
12+
from servicelib.async_utils import cancel_wait_task
1213
from servicelib.long_running_interfaces import (
1314
Client,
1415
FinishedWithError,
@@ -22,6 +23,12 @@
2223
)
2324
from settings_library.rabbit import RabbitSettings
2425
from settings_library.redis import RedisSettings
26+
from tenacity import (
27+
retry,
28+
retry_if_exception_type,
29+
stop_after_delay,
30+
wait_fixed,
31+
)
2532

2633
pytest_simcore_core_services_selection = [
2734
"rabbit",
@@ -157,3 +164,31 @@ async def test_timeout_error_retry_count_zero(server: Server, client: Client):
157164

158165
assert "retry_count" in f"{exec_info.value}"
159166
assert "Input should be greater than 0" in f"{exec_info.value}"
167+
168+
169+
@retry(
170+
wait=wait_fixed(0.1),
171+
stop=stop_after_delay(5),
172+
retry=retry_if_exception_type(AssertionError),
173+
) # NOTE: function has to be async or the loop does not get a cahce to switch between retries
174+
async def _assert_tasks_count(server: Server, count: int) -> None:
175+
assert isinstance(server.rpc_interface.job_interface, AsyncioTasksJobInterface)
176+
task_count = len(
177+
server.rpc_interface.job_interface._tasks.values() # noqa: SLF001 # pylint:disable=protected-access
178+
)
179+
assert task_count == count
180+
181+
182+
async def test_cancellation_from_client(server: Server, client: Client):
183+
async def _to_run() -> None:
184+
await client.ensure_result(
185+
"sleep_forever_f", expected_type=type(None), timeout=timedelta(seconds=1)
186+
)
187+
188+
await _assert_tasks_count(server, count=0)
189+
190+
task = asyncio.create_task(_to_run())
191+
await _assert_tasks_count(server, count=1)
192+
193+
await cancel_wait_task(task, max_delay=5)
194+
await _assert_tasks_count(server, count=0)

0 commit comments

Comments
 (0)