Skip to content

Commit eb1b32f

Browse files
author
Sergio García Prado
committed
ISSUE #99
* Add `minos.saga.testing` module. * Implement `LmdbSagaExecutionDatabaseOperationFactory`.
1 parent 0d6c316 commit eb1b32f

File tree

15 files changed

+332
-109
lines changed

15 files changed

+332
-109
lines changed

packages/core/minos-microservice-saga/minos/saga/definitions/steps/abc.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ def from_raw(cls, raw: Union[dict[str, Any], SagaStep], **kwargs) -> SagaStep:
5555
if isinstance(raw, cls):
5656
return raw
5757

58+
raw = raw.copy()
59+
5860
if "cls" in raw:
5961
# noinspection PyTypeChecker
6062
step_cls: type = import_module(raw.pop("cls"))
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,31 @@
11
from abc import (
22
ABC,
3+
abstractmethod,
4+
)
5+
from uuid import (
6+
UUID,
37
)
48

59
from minos.common import (
10+
DatabaseOperation,
611
DatabaseOperationFactory,
712
)
813

914

1015
class SagaExecutionDatabaseOperationFactory(DatabaseOperationFactory, ABC):
1116
"""TODO"""
17+
18+
@abstractmethod
19+
def build_store(self, uuid: UUID, **kwargs) -> DatabaseOperation:
20+
"""TODO"""
21+
raise NotImplementedError
22+
23+
@abstractmethod
24+
def build_load(self, uuid: UUID) -> DatabaseOperation:
25+
"""TODO"""
26+
raise NotImplementedError
27+
28+
@abstractmethod
29+
def build_delete(self, uuid: UUID) -> DatabaseOperation:
30+
"""TODO"""
31+
raise NotImplementedError

packages/core/minos-microservice-saga/minos/saga/executions/repositories/database/impl.py

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,15 @@
22
annotations,
33
)
44

5-
from contextlib import (
6-
suppress,
7-
)
85
from typing import (
9-
Type,
6+
Optional,
107
)
118
from uuid import (
129
UUID,
1310
)
1411

1512
from minos.common import (
16-
Config,
17-
MinosConfigException,
18-
MinosJsonBinaryProtocol,
19-
MinosStorage,
20-
MinosStorageLmdb,
13+
DatabaseMixin,
2114
)
2215

2316
from ....exceptions import (
@@ -29,39 +22,31 @@
2922
from ..abc import (
3023
SagaExecutionRepository,
3124
)
25+
from .factories import (
26+
SagaExecutionDatabaseOperationFactory,
27+
)
3228

3329

34-
class DatabaseSagaExecutionRepository(SagaExecutionRepository):
30+
class DatabaseSagaExecutionRepository(SagaExecutionRepository, DatabaseMixin[SagaExecutionDatabaseOperationFactory]):
3531
"""Saga Execution Storage class."""
3632

37-
def __init__(
38-
self,
39-
storage_cls: Type[MinosStorage] = MinosStorageLmdb,
40-
protocol=MinosJsonBinaryProtocol,
41-
db_name: str = "LocalState",
42-
*args,
43-
**kwargs,
44-
):
45-
super().__init__(*args, **kwargs)
46-
self.db_name = db_name
47-
self._storage = storage_cls.build(protocol=protocol, **kwargs)
48-
49-
@classmethod
50-
def _from_config(cls, config: Config, **kwargs) -> SagaExecutionRepository:
51-
with suppress(MinosConfigException):
52-
kwargs |= config.get_database_by_name("saga")
53-
return cls(**kwargs)
33+
def __init__(self, *args, database_key: Optional[tuple[str]] = None, **kwargs):
34+
if database_key is None:
35+
database_key = ("saga",)
36+
super().__init__(*args, database_key=database_key, **kwargs)
5437

5538
async def _store(self, execution: SagaExecution) -> None:
56-
key = str(execution.uuid)
57-
value = execution.raw
58-
self._storage.update(table=self.db_name, key=key, value=value)
39+
operation = self.database_operation_factory.build_store(**execution.raw)
40+
await self.execute_on_database(operation)
5941

6042
async def _delete(self, uuid: UUID) -> None:
61-
self._storage.delete(table=self.db_name, key=str(uuid))
43+
operation = self.database_operation_factory.build_delete(uuid)
44+
await self.execute_on_database(operation)
6245

6346
async def _load(self, uuid: UUID) -> SagaExecution:
64-
value = self._storage.get(table=self.db_name, key=str(uuid))
47+
operation = self.database_operation_factory.build_load(uuid)
48+
49+
value = await self.execute_on_database_and_fetch_one(operation)
6550
if value is None:
6651
raise SagaExecutionNotFoundException(f"The execution identified by {uuid} was not found.")
6752
execution = SagaExecution.from_raw(value)

packages/core/minos-microservice-saga/minos/saga/executions/saga.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ def from_raw(cls, raw: Union[dict[str, Any], SagaExecution], **kwargs) -> SagaEx
9393
if isinstance(raw, cls):
9494
return raw
9595

96+
raw = raw.copy()
97+
9698
current = raw | kwargs
9799
current["definition"] = Saga.from_raw(current["definition"])
98100
current["status"] = SagaStatus.from_raw(current["status"])

packages/core/minos-microservice-saga/minos/saga/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def __init__(
8585
self.broker_pool = broker_pool
8686

8787
@classmethod
88-
def _from_config(cls, *args, config: Config, **kwargs) -> SagaManager:
88+
def _from_config(cls, config: Config, **kwargs) -> SagaManager:
8989
"""Build an instance from config.
9090
9191
:param args: Additional positional arguments.
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
from abc import (
2+
ABC,
3+
abstractmethod,
4+
)
5+
from uuid import (
6+
UUID,
7+
)
8+
9+
from minos.common import (
10+
DatabaseOperation,
11+
DeclarativeModel,
12+
)
13+
from minos.common.testing import (
14+
MinosTestCase,
15+
MockedDatabaseClient,
16+
MockedDatabaseOperation,
17+
)
18+
19+
from .context import (
20+
SagaContext,
21+
)
22+
from .definitions import (
23+
Saga,
24+
)
25+
from .exceptions import (
26+
SagaExecutionNotFoundException,
27+
)
28+
from .executions import (
29+
SagaExecution,
30+
SagaExecutionDatabaseOperationFactory,
31+
SagaExecutionRepository,
32+
)
33+
34+
35+
class MockedSagaExecutionDatabaseOperationFactory(SagaExecutionDatabaseOperationFactory):
36+
"""For testing purposes"""
37+
38+
def build_store(self, uuid: UUID, **kwargs) -> DatabaseOperation:
39+
"""For testing purposes"""
40+
return MockedDatabaseOperation("create_table")
41+
42+
def build_load(self, uuid: UUID) -> DatabaseOperation:
43+
"""For testing purposes"""
44+
return MockedDatabaseOperation("create_table")
45+
46+
def build_delete(self, uuid: UUID) -> DatabaseOperation:
47+
"""For testing purposes"""
48+
return MockedDatabaseOperation("create_table")
49+
50+
51+
MockedDatabaseClient.set_factory(
52+
SagaExecutionDatabaseOperationFactory,
53+
MockedSagaExecutionDatabaseOperationFactory,
54+
)
55+
56+
57+
class Foo(DeclarativeModel):
58+
"""For testing purposes."""
59+
60+
foo: str
61+
62+
63+
def _fn1(context: SagaContext) -> SagaContext: # pragma: no cover
64+
context["payment"] = "payment"
65+
return context
66+
67+
68+
def _fn2(context: SagaContext) -> SagaContext: # pragma: no cover
69+
context["payment"] = None
70+
return context
71+
72+
73+
_SAGA = Saga().local_step(_fn1).on_failure(_fn2).commit()
74+
75+
76+
class SagaExecutionRepositoryTestCase(MinosTestCase, ABC):
77+
__test__ = False
78+
79+
def setUp(self) -> None:
80+
super().setUp()
81+
self.saga_execution_repository = self.build_saga_execution_repository()
82+
83+
async def asyncSetUp(self) -> None:
84+
await super().asyncSetUp()
85+
86+
await self.saga_execution_repository.setup()
87+
88+
execution = SagaExecution.from_definition(_SAGA)
89+
await execution.execute(autocommit=False)
90+
91+
self.execution = execution
92+
93+
self.another = SagaExecution.from_definition(_SAGA)
94+
95+
async def asyncTearDown(self):
96+
await self.saga_execution_repository.destroy()
97+
await super().asyncTearDown()
98+
99+
@abstractmethod
100+
def build_saga_execution_repository(self) -> SagaExecutionRepository:
101+
"""For testing purposes."""
102+
103+
async def test_store(self):
104+
await self.saga_execution_repository.store(self.execution)
105+
106+
self.assertEqual(self.execution, await self.saga_execution_repository.load(self.execution.uuid))
107+
108+
async def test_store_overwrite(self):
109+
await self.saga_execution_repository.store(self.execution)
110+
self.assertEqual(self.execution, await self.saga_execution_repository.load(self.execution.uuid))
111+
112+
self.another.uuid = self.execution.uuid
113+
await self.saga_execution_repository.store(self.another)
114+
115+
self.assertNotEqual(self.execution, await self.saga_execution_repository.load(self.execution.uuid))
116+
self.assertEqual(self.another, await self.saga_execution_repository.load(self.execution.uuid))
117+
118+
async def test_load_from_str(self):
119+
await self.saga_execution_repository.store(self.execution)
120+
self.assertEqual(self.execution, await self.saga_execution_repository.load(str(self.execution.uuid)))
121+
122+
async def test_load_raises(self):
123+
with self.assertRaises(SagaExecutionNotFoundException):
124+
await self.saga_execution_repository.load(self.execution.uuid)
125+
126+
async def test_delete(self):
127+
await self.saga_execution_repository.store(self.execution)
128+
await self.saga_execution_repository.delete(self.execution)
129+
with self.assertRaises(SagaExecutionNotFoundException):
130+
await self.saga_execution_repository.load(self.execution.uuid)
131+
132+
async def test_delete_from_str(self):
133+
await self.saga_execution_repository.store(self.execution)
134+
await self.saga_execution_repository.delete(str(self.execution.uuid))
135+
with self.assertRaises(SagaExecutionNotFoundException):
136+
await self.saga_execution_repository.load(self.execution.uuid)
Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1-
service:
2-
name: order
3-
aggregate: tests.utils.Order
4-
saga:
5-
storage:
6-
path: "./order.lmdb"
1+
version: 2
2+
name: order
3+
4+
databases:
5+
saga:
6+
client: minos.common.testing.MockedDatabaseClient
7+
path: "./order.lmdb"
8+
9+
pools:
10+
database: minos.common.DatabaseClientPool

0 commit comments

Comments
 (0)