Skip to content

Commit ac25ba3

Browse files
author
Andrei Neagu
committed
extended tests to ensure working as expected
1 parent 15cef78 commit ac25ba3

File tree

3 files changed

+56
-18
lines changed

3 files changed

+56
-18
lines changed

packages/service-library/src/servicelib/resilent_long_running/_client.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from settings_library.redis import RedisSettings
1010

1111
from ._errors import (
12-
AlreadyStartedError,
1312
FinishedWithError,
1413
NoMoreRetryAttemptsError,
1514
TimedOutError,
@@ -161,17 +160,8 @@ async def _start_job_if_missing(
161160
) -> None:
162161
# if job is missing on server side start it
163162
if await self._rpc_interface.get_status(unique_id) == JobStatus.NOT_FOUND:
164-
try:
165-
await self._rpc_interface.start(
166-
name, unique_id, timeout=timeout, **params
167-
)
168-
await self._store_interface.update_entry_expiry(
169-
unique_id, expire=timeout
170-
)
171-
except AlreadyStartedError:
172-
_logger.info(
173-
"unique_id='%s', was already running, did not start", unique_id
174-
)
163+
await self._rpc_interface.start(name, unique_id, timeout=timeout, **params)
164+
await self._store_interface.update_entry_expiry(unique_id, expire=timeout)
175165

176166
async def _format_remaining_attempts(
177167
self, unique_id: JobUniqueId, retry_count: NonNegativeInt
@@ -263,14 +253,15 @@ async def ensure_result(
263253
timeout -- maximum time to wait for the result
264254
265255
Keyword Arguments:
266-
is_unique -- only one instance of this task can exsist at any given time (default: {False})
256+
is_unique -- only one instance of the remote handler can exsist at any given time (default: {False})
267257
retry_count -- how many times to retry execution before giving up (default: {3})
268258
269259
Raises:
270260
UnexpectedResultTypeError: wrong result type
271261
TimedOutError: did not finish in time
262+
AlreadyStartedError: a handler marked as unique with the same params is already running
272263
NoMoreRetryAttemptsError: no more retry attempts left and still raised an error
273-
FinishedWithError: task on remove finised with an error
264+
FinishedWithError: handler on remove finised with an error
274265
275266
Returns:
276267
the resturn value of the handler invoked remotley

packages/service-library/src/servicelib/resilent_long_running/_rpc/server.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ async def setup(self) -> None:
4848
self.result,
4949
):
5050
router.expose(
51-
reraise_if_error_type=(JobNotFoundError, NoResultIsAvailableError)
51+
reraise_if_error_type=(
52+
JobNotFoundError,
53+
AlreadyStartedError,
54+
NoResultIsAvailableError,
55+
)
5256
)(handler)
5357

5458
await self._rabbitmq_rpc_server.register_router(router, self._rpc_namespace)

packages/service-library/tests/resilient_long_running/test_workflow.py

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
Server,
1919
TimedOutError,
2020
)
21+
from servicelib.resilent_long_running._client import AlreadyStartedError
2122
from servicelib.resilent_long_running._models import JobUniqueId
2223
from servicelib.resilent_long_running.runners.asyncio_tasks import (
2324
AsyncioTasksJobInterface,
2425
AsyncTaskRegistry,
2526
)
27+
from servicelib.utils import limited_gather
2628
from settings_library.rabbit import RabbitSettings
2729
from settings_library.redis import RedisSettings
2830
from tenacity import (
@@ -66,6 +68,10 @@ async def some_f() -> None:
6668
async def echo_f(data: Any) -> Any:
6769
return data
6870

71+
@registry.expose()
72+
async def inc_number_f(number: int) -> int:
73+
return number + 1
74+
6975
@registry.expose()
7076
async def raising_f() -> None:
7177
msg = "I always raise an error"
@@ -134,6 +140,25 @@ async def test_workflow(
134140
assert type(result) is expected_type
135141

136142

143+
@pytest.mark.parametrize("is_unique", [True, False])
144+
async def test_workflow_paralle_calls_to_same_handler(
145+
server: Server, client: Client, is_unique: bool
146+
):
147+
async def _to_run(number: int) -> None:
148+
result = await client.ensure_result(
149+
"inc_number_f",
150+
expected_type=int,
151+
timeout=timedelta(seconds=30),
152+
number=number,
153+
is_unique=is_unique,
154+
)
155+
assert result == number + 1
156+
157+
count = 100
158+
await limited_gather(*(_to_run(number) for number in range(count)), limit=count)
159+
await _assert_tasks_count(server, count=0)
160+
161+
137162
@pytest.mark.parametrize("is_unique", [True, False])
138163
async def test_timeout_error(server: Server, client: Client, is_unique: bool):
139164
with pytest.raises(TimedOutError):
@@ -319,18 +344,36 @@ async def main():
319344
process.kill()
320345

321346

322-
@pytest.mark.parametrize("is_unique", [False])
347+
async def test_start_unique_task_twice_is_not_allowed(server: Server, client: Client):
348+
async def _runner() -> None:
349+
await client.ensure_result(
350+
"sleep_for_f",
351+
expected_type=type(None),
352+
timeout=timedelta(seconds=30),
353+
duration=2,
354+
is_unique=True,
355+
)
356+
357+
with pytest.raises(AlreadyStartedError):
358+
await limited_gather(*(_runner() for _ in range(2)), limit=2)
359+
360+
323361
async def test_cancellation_of_client_can_resume_process(
324362
server: Server,
325363
client_process: Callable[[Callable[[Client], Awaitable[None]]], Process],
326364
client: Client,
327-
is_unique: bool,
328365
):
329366
await _assert_tasks_count(server, count=0)
330367

368+
# NOTE: only when the task is maked as unique can the client pickup
369+
# the running handler running on remote once it's restarted
370+
371+
# NOTE: you cannot start the same unique task twice
372+
# a task is unique when all of it's parameters are the same and is maked as unique
373+
331374
async def _runner(client_: Client) -> None:
332375
await _sleep_for_ensure_result(
333-
client_, retry_count=3, timeout=timedelta(minutes=1), is_unique=is_unique
376+
client_, retry_count=3, timeout=timedelta(minutes=1), is_unique=True
334377
)
335378

336379
# start task in process

0 commit comments

Comments
 (0)