Skip to content

Commit 2843039

Browse files
author
Sergio García Prado
authored
Merge pull request #477 from minos-framework/issue-176-add-saga-runner-timeout
#176 - Add `timeout` to `SagaRunner.run`
2 parents 6937cfb + 5e17999 commit 2843039

File tree

4 files changed

+96
-56
lines changed
  • packages/core
    • minos-microservice-common/minos/common
    • minos-microservice-networks/tests/test_networks/test_brokers/test_collections/test_queues
    • minos-microservice-saga

4 files changed

+96
-56
lines changed

packages/core/minos-microservice-common/minos/common/setup.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
import logging
66
import warnings
7+
from asyncio import (
8+
shield,
9+
)
710
from pathlib import (
811
Path,
912
)
@@ -95,7 +98,7 @@ async def setup(self) -> None:
9598
"""
9699
if not self._already_setup:
97100
logger.debug(f"Setting up a {type(self).__name__!r} instance...")
98-
await self._setup()
101+
await shield(self._setup())
99102
self._already_setup = True
100103

101104
async def _setup(self) -> None:
@@ -111,7 +114,7 @@ async def destroy(self) -> None:
111114
"""
112115
if self._already_setup:
113116
logger.debug(f"Destroying a {type(self).__name__!r} instance...")
114-
await self._destroy()
117+
await shield(self._destroy())
115118
self._already_setup = False
116119

117120
async def _destroy(self) -> None:

packages/core/minos-microservice-networks/tests/test_networks/test_brokers/test_collections/test_queues/test_database.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,6 @@ async def test_aiter(self):
7575
]
7676

7777
queue = DatabaseBrokerQueue.from_config(self.config, operation_factory=self.operation_factory)
78-
await queue.setup()
79-
await queue.enqueue(messages[0])
80-
await queue.enqueue(messages[1])
8178

8279
with patch.object(
8380
MockedDatabaseClient,
@@ -90,6 +87,9 @@ async def test_aiter(self):
9087
cycle([FakeAsyncIterator([(0,)])]),
9188
),
9289
):
90+
await queue.setup()
91+
await queue.enqueue(messages[0])
92+
await queue.enqueue(messages[1])
9393

9494
observed = list()
9595
async for message in queue:
@@ -110,11 +110,11 @@ async def test_dequeue_with_count(self):
110110
"fetch_all",
111111
return_value=FakeAsyncIterator([[1, messages[0].avro_bytes], [2, bytes()], [3, messages[1].avro_bytes]]),
112112
):
113-
async with DatabaseBrokerQueue.from_config(self.config, operation_factory=self.operation_factory) as queue:
114-
queue._get_count = AsyncMock(side_effect=[3, 0])
113+
queue = DatabaseBrokerQueue.from_config(self.config, operation_factory=self.operation_factory)
114+
queue._get_count = AsyncMock(side_effect=[3, 0])
115115

116-
async with queue:
117-
observed = [await queue.dequeue(), await queue.dequeue()]
116+
async with queue:
117+
observed = [await queue.dequeue(), await queue.dequeue()]
118118

119119
self.assertEqual(messages, observed)
120120

packages/core/minos-microservice-saga/minos/saga/executions/runners.py

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
import logging
88
import warnings
9+
from asyncio import (
10+
TimeoutError,
11+
wait_for,
12+
)
913
from functools import (
1014
reduce,
1115
)
@@ -96,6 +100,7 @@ async def run(
96100
pause_on_disk: bool = False,
97101
raise_on_error: bool = True,
98102
return_execution: bool = True,
103+
timeout: Optional[float] = None,
99104
**kwargs,
100105
) -> Union[UUID, SagaExecution]:
101106
"""Perform a run of a ``Saga``.
@@ -114,73 +119,69 @@ async def run(
114119
but with ``Errored`` status.
115120
:param return_execution: If ``True`` the ``SagaExecution`` instance is returned. Otherwise, only the
116121
identifier (``UUID``) is returned.
122+
:param timeout: Maximum execution time in seconds.
117123
:param kwargs: Additional named arguments.
118124
:return: This method does not return anything.
119125
"""
120126
if isinstance(definition, SagaDecoratorWrapper):
121127
definition = definition.meta.definition
122128

123-
if response is not None:
124-
return await self._load_and_run(
125-
response=response,
126-
autocommit=autocommit,
127-
pause_on_disk=pause_on_disk,
128-
raise_on_error=raise_on_error,
129-
return_execution=return_execution,
130-
**kwargs,
131-
)
132-
133-
return await self._run_new(
134-
definition=definition,
135-
context=context,
136-
user=user,
129+
if response is None:
130+
execution = await self._create(definition, context, user)
131+
else:
132+
execution = await self._load(response)
133+
134+
execution = await self._run(
135+
execution,
136+
timeout=timeout,
137+
response=response,
137138
autocommit=autocommit,
138139
pause_on_disk=pause_on_disk,
139140
raise_on_error=raise_on_error,
140-
return_execution=return_execution,
141141
**kwargs,
142142
)
143+
if return_execution:
144+
return execution
143145

144-
async def _run_new(
145-
self, definition: Saga, context: Optional[SagaContext] = None, user: Optional[UUID] = None, **kwargs
146-
) -> Union[UUID, SagaExecution]:
146+
return execution.uuid
147+
148+
@staticmethod
149+
async def _create(definition: Saga, context: Optional[SagaContext], user: Optional[UUID]) -> SagaExecution:
147150
if REQUEST_USER_CONTEXT_VAR.get() is not None:
148151
if user is not None:
149152
warnings.warn("The `user` Argument will be ignored in favor of the `user` ContextVar", RuntimeWarning)
150153
user = REQUEST_USER_CONTEXT_VAR.get()
151154

152-
execution = SagaExecution.from_definition(definition, context=context, user=user)
153-
return await self._run(execution, **kwargs)
155+
return SagaExecution.from_definition(definition, context=context, user=user)
154156

155-
async def _load_and_run(self, response: SagaResponse, **kwargs) -> Union[UUID, SagaExecution]:
156-
execution = await self.storage.load(response.uuid)
157-
return await self._run(execution, response=response, **kwargs)
157+
async def _load(self, response: SagaResponse) -> Union[UUID, SagaExecution]:
158+
return await self.storage.load(response.uuid)
158159

159-
async def _run(
160-
self,
161-
execution: SagaExecution,
162-
pause_on_disk: bool = False,
163-
raise_on_error: bool = True,
164-
return_execution: bool = True,
165-
**kwargs,
166-
) -> Union[UUID, SagaExecution]:
160+
async def _run(self, execution: SagaExecution, raise_on_error: bool, **kwargs) -> SagaExecution:
167161
try:
168-
if pause_on_disk:
169-
await self._run_with_pause_on_disk(execution, **kwargs)
170-
else:
171-
await self._run_with_pause_on_memory(execution, **kwargs)
162+
await self._run_with_timeout(execution, **kwargs)
172163
except SagaFailedExecutionException as exc:
173164
if raise_on_error:
174165
raise exc
175-
logger.exception(f"The execution identified by {execution.uuid!s} failed")
166+
logger.exception(f"The execution identified by {execution.uuid!s} failed: {exc.exception!r}")
176167
finally:
177168
await self.storage.store(execution)
178169
self._update_request_headers(execution)
179170

180-
if return_execution:
181-
return execution
171+
return execution
182172

183-
return execution.uuid
173+
async def _run_with_timeout(self, execution: SagaExecution, timeout: Optional[float], **kwargs) -> None:
174+
future = self._run_with_pause(execution, **kwargs)
175+
try:
176+
return await wait_for(future, timeout=timeout)
177+
except TimeoutError as exc:
178+
raise SagaFailedExecutionException(exc)
179+
180+
async def _run_with_pause(self, execution: SagaExecution, pause_on_disk: bool, **kwargs) -> None:
181+
if pause_on_disk:
182+
await self._run_with_pause_on_disk(execution, **kwargs)
183+
else:
184+
await self._run_with_pause_on_memory(execution, **kwargs)
184185

185186
@staticmethod
186187
def _update_request_headers(execution: SagaExecution) -> None:
@@ -197,9 +198,11 @@ def _update_request_headers(execution: SagaExecution) -> None:
197198
headers["related_services"] = ",".join(related_services)
198199

199200
@staticmethod
200-
async def _run_with_pause_on_disk(execution: SagaExecution, autocommit: bool = True, **kwargs) -> None:
201+
async def _run_with_pause_on_disk(
202+
execution: SagaExecution, response: Optional[SagaResponse] = None, autocommit: bool = True, **kwargs
203+
) -> None:
201204
try:
202-
await execution.execute(autocommit=False, **kwargs)
205+
await execution.execute(autocommit=False, response=response, **kwargs)
203206
if autocommit:
204207
await execution.commit(**kwargs)
205208
except SagaPausedExecutionStepException:

packages/core/minos-microservice-saga/tests/test_saga/test_executions/test_runners.py

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,28 +91,62 @@ async def test_context_runner(self):
9191

9292
async def test_run(self):
9393
expected = SagaExecution.from_definition(ADD_ORDER)
94-
mock = AsyncMock(return_value=expected)
95-
self.runner._run_new = mock
94+
create_mock = AsyncMock()
95+
run_mock = AsyncMock(return_value=expected)
96+
self.runner._create = create_mock
97+
self.runner._run = run_mock
9698

9799
observed = await self.runner.run(ADD_ORDER)
98100
self.assertEqual(expected, observed)
99101

100102
async def test_run_from_wrapper(self):
101103
expected = SagaExecution.from_definition(DeleteOrderSaga)
102-
mock = AsyncMock(return_value=expected)
103-
self.runner._run_new = mock
104+
create_mock = AsyncMock()
105+
run_mock = AsyncMock(return_value=expected)
106+
self.runner._create = create_mock
107+
self.runner._run = run_mock
104108

105109
observed = await self.runner.run(DeleteOrderSaga)
106110
self.assertEqual(expected, observed)
107111

108112
async def test_load_and_run(self):
109113
expected = SagaExecution.from_definition(ADD_ORDER)
110-
mock = AsyncMock(return_value=expected)
111-
self.runner._load_and_run = mock
114+
load_mock = AsyncMock()
115+
run_mock = AsyncMock(return_value=expected)
116+
self.runner._load = load_mock
117+
self.runner._run = run_mock
112118

113119
observed = await self.runner.run(response=SagaResponse(uuid=expected.uuid))
114120
self.assertEqual(expected, observed)
115121

122+
async def test_run_with_timeout(self):
123+
expected = SagaExecution.from_definition(ADD_ORDER)
124+
create_mock = AsyncMock(return_value=expected)
125+
store_mock = AsyncMock()
126+
update_headers_mock = AsyncMock
127+
run_mock = AsyncMock()
128+
self.runner._create = create_mock
129+
self.runner.storage.store = store_mock
130+
self.runner._update_request_headers = update_headers_mock
131+
self.runner._run_with_pause = run_mock
132+
133+
observed = await self.runner.run(ADD_ORDER, timeout=60)
134+
self.assertEqual(expected, observed)
135+
136+
async def test_run_with_timeout_raises(self):
137+
expected = SagaExecution.from_definition(ADD_ORDER)
138+
create_mock = AsyncMock(return_value=expected)
139+
store_mock = AsyncMock()
140+
update_headers_mock = AsyncMock
141+
run_mock = AsyncMock()
142+
self.runner._create = create_mock
143+
self.runner.storage.store = store_mock
144+
self.runner._update_request_headers = update_headers_mock
145+
self.runner._run_with_pause = run_mock
146+
147+
with self.assertRaises(SagaFailedExecutionException):
148+
await self.runner.run(ADD_ORDER, timeout=0.0)
149+
116150
async def test_run_with_pause_on_memory(self):
117151
self.broker_subscriber_builder.with_messages(
118152
[

0 commit comments

Comments
 (0)