Skip to content

Commit 1e80eab

Browse files
author
Sergio García Prado
committed
ISSUE #176
* Add tests for timeout.
1 parent f59c9d1 commit 1e80eab

File tree

2 files changed

+53
-21
lines changed

2 files changed

+53
-21
lines changed

packages/core/minos-microservice-saga/minos/saga/executions/runners.py

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import logging
88
import warnings
99
from asyncio import (
10+
TimeoutError,
1011
wait_for,
1112
)
1213
from functools import (
@@ -99,7 +100,7 @@ async def run(
99100
pause_on_disk: bool = False,
100101
raise_on_error: bool = True,
101102
return_execution: bool = True,
102-
timeout: Optional[float] = 60.0,
103+
timeout: Optional[float] = None,
103104
**kwargs,
104105
) -> Union[UUID, SagaExecution]:
105106
"""Perform a run of a ``Saga``.
@@ -118,7 +119,7 @@ async def run(
118119
but with ``Errored`` status.
119120
:param return_execution: If ``True`` the ``SagaExecution`` instance is returned. Otherwise, only the
120121
identifier (``UUID``) is returned.
121-
:param timeout: TODO
122+
:param timeout: Maximum execution time in seconds.
122123
:param kwargs: Additional named arguments.
123124
:return: This method does not return anything.
124125
"""
@@ -130,16 +131,19 @@ async def run(
130131
else:
131132
execution = await self._load(response)
132133

133-
future = self._run(
134+
execution = await self._run(
134135
execution,
136+
timeout=timeout,
135137
response=response,
136138
autocommit=autocommit,
137139
pause_on_disk=pause_on_disk,
138140
raise_on_error=raise_on_error,
139-
return_execution=return_execution,
140141
**kwargs,
141142
)
142-
return await wait_for(future, timeout=timeout)
143+
if return_execution:
144+
return execution
145+
146+
return execution.uuid
143147

144148
@staticmethod
145149
async def _create(definition: Saga, context: Optional[SagaContext], user: Optional[UUID]) -> SagaExecution:
@@ -153,31 +157,31 @@ async def _create(definition: Saga, context: Optional[SagaContext], user: Option
153157
async def _load(self, response: SagaResponse) -> Union[UUID, SagaExecution]:
154158
return await self.storage.load(response.uuid)
155159

156-
async def _run(
157-
self,
158-
execution: SagaExecution,
159-
pause_on_disk: bool = False,
160-
raise_on_error: bool = True,
161-
return_execution: bool = True,
162-
**kwargs,
163-
) -> Union[UUID, SagaExecution]:
160+
async def _run(self, execution: SagaExecution, raise_on_error: bool, **kwargs) -> SagaExecution:
164161
try:
165-
if pause_on_disk:
166-
await self._run_with_pause_on_disk(execution, **kwargs)
167-
else:
168-
await self._run_with_pause_on_memory(execution, **kwargs)
162+
await self._run_with_timeout(execution, **kwargs)
169163
except SagaFailedExecutionException as exc:
170164
if raise_on_error:
171165
raise exc
172-
logger.exception(f"The execution identified by {execution.uuid!s} failed")
166+
logger.exception(f"The execution identified by {execution.uuid!s} failed: {exc.exception!r}")
173167
finally:
174168
await self.storage.store(execution)
175169
self._update_request_headers(execution)
176170

177-
if return_execution:
178-
return execution
171+
return execution
179172

180-
return execution.uuid
173+
async def _run_with_timeout(self, execution: SagaExecution, timeout: Optional[float], **kwargs) -> None:
174+
future = self._run_with_pause(execution, **kwargs)
175+
try:
176+
return await wait_for(future, timeout=timeout)
177+
except TimeoutError as exc:
178+
raise SagaFailedExecutionException(exc)
179+
180+
async def _run_with_pause(self, execution: SagaExecution, pause_on_disk: bool, **kwargs) -> None:
181+
if pause_on_disk:
182+
await self._run_with_pause_on_disk(execution, **kwargs)
183+
else:
184+
await self._run_with_pause_on_memory(execution, **kwargs)
181185

182186
@staticmethod
183187
def _update_request_headers(execution: SagaExecution) -> None:

packages/core/minos-microservice-saga/tests/test_saga/test_executions/test_runners.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,34 @@ async def test_load_and_run(self):
119119
observed = await self.runner.run(response=SagaResponse(uuid=expected.uuid))
120120
self.assertEqual(expected, observed)
121121

122+
async def test_run_with_timeout(self):
123+
expected = SagaExecution.from_definition(ADD_ORDER)
124+
create_mock = AsyncMock(return_value=expected)
125+
store_mock = AsyncMock()
126+
update_headers_mock = AsyncMock
127+
run_mock = AsyncMock()
128+
self.runner._create = create_mock
129+
self.runner.storage.store = store_mock
130+
self.runner._update_request_headers = update_headers_mock
131+
self.runner._run_with_pause = run_mock
132+
133+
observed = await self.runner.run(ADD_ORDER, timeout=60)
134+
self.assertEqual(expected, observed)
135+
136+
async def test_run_with_timeout_raises(self):
137+
expected = SagaExecution.from_definition(ADD_ORDER)
138+
create_mock = AsyncMock(return_value=expected)
139+
store_mock = AsyncMock()
140+
update_headers_mock = AsyncMock
141+
run_mock = AsyncMock()
142+
self.runner._create = create_mock
143+
self.runner.storage.store = store_mock
144+
self.runner._update_request_headers = update_headers_mock
145+
self.runner._run_with_pause = run_mock
146+
147+
with self.assertRaises(SagaFailedExecutionException):
148+
await self.runner.run(ADD_ORDER, timeout=0.0)
149+
122150
async def test_run_with_pause_on_memory(self):
123151
self.broker_subscriber_builder.with_messages(
124152
[

0 commit comments

Comments
 (0)