Skip to content

Commit 0d6c316

Browse files
author
Sergio García Prado
committed
ISSUE #99
* Refactor `SagaExecutionStorage` as `SagaExecutionRepository`. * Add `DatabaseSagaExecutionRepository`.
1 parent f5e60d0 commit 0d6c316

File tree

14 files changed

+252
-187
lines changed

14 files changed

+252
-187
lines changed

packages/core/minos-microservice-saga/minos/saga/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
)
4747
from .executions import (
4848
ConditionalSagaStepExecution,
49+
DatabaseSagaExecutionRepository,
4950
Executor,
5051
LocalExecutor,
5152
LocalSagaStepExecution,
@@ -54,7 +55,7 @@
5455
ResponseExecutor,
5556
SagaExecution,
5657
SagaExecutionDatabaseOperationFactory,
57-
SagaExecutionStorage,
58+
SagaExecutionRepository,
5859
SagaStatus,
5960
SagaStepExecution,
6061
SagaStepStatus,
Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
from .commit import (
22
TransactionCommitter,
33
)
4-
from .database import (
5-
SagaExecutionDatabaseOperationFactory,
6-
)
74
from .executors import (
85
Executor,
96
LocalExecutor,
107
RequestExecutor,
118
ResponseExecutor,
129
)
10+
from .repositories import (
11+
DatabaseSagaExecutionRepository,
12+
SagaExecutionDatabaseOperationFactory,
13+
SagaExecutionRepository,
14+
)
1315
from .saga import (
1416
SagaExecution,
1517
)
@@ -23,6 +25,3 @@
2325
RemoteSagaStepExecution,
2426
SagaStepExecution,
2527
)
26-
from .storage import (
27-
SagaExecutionStorage,
28-
)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from .abc import (
2+
SagaExecutionRepository,
3+
)
4+
from .database import (
5+
DatabaseSagaExecutionRepository,
6+
SagaExecutionDatabaseOperationFactory,
7+
)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from abc import (
2+
ABC,
3+
abstractmethod,
4+
)
5+
from typing import (
6+
Union,
7+
)
8+
from uuid import (
9+
UUID,
10+
)
11+
12+
from minos.common import (
13+
SetupMixin,
14+
)
15+
16+
from ..saga import (
17+
SagaExecution,
18+
)
19+
20+
21+
class SagaExecutionRepository(SetupMixin, ABC):
22+
"""TODO"""
23+
24+
async def store(self, execution: SagaExecution) -> None:
25+
"""Store an execution.
26+
27+
:param execution: Execution to be stored.
28+
:return: This method does not return anything.
29+
"""
30+
return await self._store(execution)
31+
32+
async def _store(self, execution: SagaExecution) -> None:
33+
raise NotImplementedError
34+
35+
async def load(self, uuid: Union[str, UUID]) -> SagaExecution:
36+
"""Load the saga execution stored on the given key.
37+
38+
:param uuid: The key to identify the execution.
39+
:return: A ``SagaExecution`` instance.
40+
"""
41+
if not isinstance(uuid, UUID):
42+
uuid = UUID(uuid)
43+
return await self._load(uuid)
44+
45+
@abstractmethod
46+
async def _load(self, uuid: UUID) -> SagaExecution:
47+
raise NotImplementedError
48+
49+
async def delete(self, uuid: Union[SagaExecution, str, UUID]) -> None:
50+
"""Delete the reference of the given key.
51+
52+
:param uuid: Execution key to be deleted.
53+
:return: This method does not return anything.
54+
"""
55+
if isinstance(uuid, SagaExecution):
56+
uuid = uuid.uuid
57+
if not isinstance(uuid, UUID):
58+
uuid = UUID(uuid)
59+
return await self._delete(uuid)
60+
61+
async def _delete(self, key: UUID) -> None:
62+
raise NotImplementedError
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
from .factories import (
22
SagaExecutionDatabaseOperationFactory,
33
)
4+
from .impl import (
5+
DatabaseSagaExecutionRepository,
6+
)
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from __future__ import (
2+
annotations,
3+
)
4+
5+
from contextlib import (
6+
suppress,
7+
)
8+
from typing import (
9+
Type,
10+
)
11+
from uuid import (
12+
UUID,
13+
)
14+
15+
from minos.common import (
16+
Config,
17+
MinosConfigException,
18+
MinosJsonBinaryProtocol,
19+
MinosStorage,
20+
MinosStorageLmdb,
21+
)
22+
23+
from ....exceptions import (
24+
SagaExecutionNotFoundException,
25+
)
26+
from ...saga import (
27+
SagaExecution,
28+
)
29+
from ..abc import (
30+
SagaExecutionRepository,
31+
)
32+
33+
34+
class DatabaseSagaExecutionRepository(SagaExecutionRepository):
35+
"""Saga Execution Storage class."""
36+
37+
def __init__(
38+
self,
39+
storage_cls: Type[MinosStorage] = MinosStorageLmdb,
40+
protocol=MinosJsonBinaryProtocol,
41+
db_name: str = "LocalState",
42+
*args,
43+
**kwargs,
44+
):
45+
super().__init__(*args, **kwargs)
46+
self.db_name = db_name
47+
self._storage = storage_cls.build(protocol=protocol, **kwargs)
48+
49+
@classmethod
50+
def _from_config(cls, config: Config, **kwargs) -> SagaExecutionRepository:
51+
with suppress(MinosConfigException):
52+
kwargs |= config.get_database_by_name("saga")
53+
return cls(**kwargs)
54+
55+
async def _store(self, execution: SagaExecution) -> None:
56+
key = str(execution.uuid)
57+
value = execution.raw
58+
self._storage.update(table=self.db_name, key=key, value=value)
59+
60+
async def _delete(self, uuid: UUID) -> None:
61+
self._storage.delete(table=self.db_name, key=str(uuid))
62+
63+
async def _load(self, uuid: UUID) -> SagaExecution:
64+
value = self._storage.get(table=self.db_name, key=str(uuid))
65+
if value is None:
66+
raise SagaExecutionNotFoundException(f"The execution identified by {uuid} was not found.")
67+
execution = SagaExecution.from_raw(value)
68+
return execution

packages/core/minos-microservice-saga/minos/saga/executions/storage.py

Lines changed: 0 additions & 90 deletions
This file was deleted.

packages/core/minos-microservice-saga/minos/saga/manager.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@
4545
SagaPausedExecutionStepException,
4646
)
4747
from .executions import (
48+
DatabaseSagaExecutionRepository,
4849
SagaExecution,
49-
SagaExecutionStorage,
50+
SagaExecutionRepository,
5051
SagaStatus,
5152
)
5253
from .messages import (
@@ -66,7 +67,7 @@ class SagaManager(SetupMixin):
6667
@Inject()
6768
def __init__(
6869
self,
69-
storage: SagaExecutionStorage,
70+
storage: SagaExecutionRepository,
7071
broker_pool: Optional[BrokerClientPool] = None,
7172
pool_factory: Optional[PoolFactory] = None,
7273
*args,
@@ -92,9 +93,17 @@ def _from_config(cls, *args, config: Config, **kwargs) -> SagaManager:
9293
:param kwargs: Additional named arguments.
9394
:return: A new ``SagaManager`` instance.
9495
"""
95-
storage = SagaExecutionStorage.from_config(config, **kwargs)
96+
storage = DatabaseSagaExecutionRepository.from_config(config, **kwargs)
9697
return cls(storage=storage, **kwargs)
9798

99+
async def _setup(self) -> None:
100+
await super()._setup()
101+
await self.storage.setup()
102+
103+
async def _destroy(self) -> None:
104+
await self.storage.destroy()
105+
await super()._destroy()
106+
98107
async def run(
99108
self,
100109
definition: Optional[Saga] = None,
@@ -161,7 +170,7 @@ async def _run_new(
161170
return await self._run(execution, **kwargs)
162171

163172
async def _load_and_run(self, response: SagaResponse, **kwargs) -> Union[UUID, SagaExecution]:
164-
execution = self.storage.load(response.uuid)
173+
execution = await self.storage.load(response.uuid)
165174
return await self._run(execution, response=response, **kwargs)
166175

167176
async def _run(
@@ -178,7 +187,7 @@ async def _run(
178187
else:
179188
await self._run_with_pause_on_memory(execution, **kwargs)
180189
except SagaFailedExecutionException as exc:
181-
self.storage.store(execution)
190+
await self.storage.store(execution)
182191
if raise_on_error:
183192
raise exc
184193
logger.exception(f"The execution identified by {execution.uuid!s} failed")
@@ -194,7 +203,7 @@ async def _run(
194203
headers["related_services"] = ",".join(related_services)
195204

196205
if execution.status == SagaStatus.Finished:
197-
self.storage.delete(execution)
206+
await self.storage.delete(execution)
198207

199208
if return_execution:
200209
return execution
@@ -207,7 +216,7 @@ async def _run_with_pause_on_disk(self, execution: SagaExecution, autocommit: bo
207216
if autocommit:
208217
await execution.commit(**kwargs)
209218
except SagaPausedExecutionStepException:
210-
self.storage.store(execution)
219+
await self.storage.store(execution)
211220
except SagaFailedExecutionException as exc:
212221
if autocommit:
213222
await execution.reject(**kwargs)
@@ -225,7 +234,7 @@ async def _run_with_pause_on_memory(
225234
await execution.execute(response=response, autocommit=False, **kwargs)
226235
except SagaPausedExecutionStepException:
227236
response = await self._get_response(broker, execution, **kwargs)
228-
self.storage.store(execution)
237+
await self.storage.store(execution)
229238
if autocommit:
230239
await execution.commit(**kwargs)
231240
except SagaFailedExecutionException as exc:

packages/core/minos-microservice-saga/tests/test_saga/test_executions/test_repositories/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)