Skip to content

Commit 131758d

Browse files
author
Sergio García Prado
authored
Merge pull request #468 from minos-framework/issue-51-implement-aiopg-saga-execution-repository
#51 - Implement `aiopg`'s `SagaExecutionRepository`
2 parents 7387b6b + 33026bd commit 131758d

File tree

20 files changed

+437
-83
lines changed

20 files changed

+437
-83
lines changed

.github/workflows/minos-database-aiopg-tests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ on:
1111
- 'packages/core/minos-microservice-aggregate/**'
1212
- 'packages/core/minos-microservice-networks/**'
1313
- 'packages/core/minos-microservice-transactions/**'
14+
- 'packages/core/minos-microservice-saga/**'
1415
- 'packages/core/minos-microservice-common/**'
1516

1617
jobs:

packages/core/minos-microservice-saga/minos/saga/executions/repositories/database/factories.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
ABC,
33
abstractmethod,
44
)
5+
from typing import (
6+
Any,
7+
Optional,
8+
)
59
from uuid import (
610
UUID,
711
)
@@ -16,10 +20,35 @@ class SagaExecutionDatabaseOperationFactory(DatabaseOperationFactory, ABC):
1620
"""Saga Execution Database Operation Factory class."""
1721

1822
@abstractmethod
19-
def build_store(self, uuid: UUID, **kwargs) -> DatabaseOperation:
23+
def build_create(self) -> DatabaseOperation:
24+
"""Build the database operation to create the delta table.
25+
26+
:return: A ``DatabaseOperation`` instance.s
27+
"""
28+
29+
@abstractmethod
30+
def build_store(
31+
self,
32+
uuid: UUID,
33+
definition: dict[str, Any],
34+
status: str,
35+
executed_steps: list[dict[str, Any]],
36+
paused_step: Optional[dict[str, Any]],
37+
context: str,
38+
already_rollback: bool,
39+
user: Optional[UUID],
40+
**kwargs
41+
) -> DatabaseOperation:
2042
"""Build the database operation to store a saga execution.
2143
2244
:param uuid: The identifier of the saga execution.
45+
:param definition: The ``Saga`` definition.
46+
:param context: The execution context.
47+
:param status: The status of the execution.
48+
:param executed_steps: The executed steps of the execution.
49+
:param paused_step: The paused step of the execution.
50+
:param already_rollback: ``True`` if already rollback of ``False`` otherwise.
51+
:param user: The user that launched the execution.
2352
:param kwargs: The attributes of the saga execution.
2453
:return: A ``DatabaseOperation`` instance.
2554
"""

packages/core/minos-microservice-saga/minos/saga/executions/repositories/database/impl.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ def __init__(self, *args, database_key: Optional[tuple[str]] = None, **kwargs):
3636
database_key = ("saga",)
3737
super().__init__(*args, database_key=database_key, **kwargs)
3838

39+
async def _setup(self) -> None:
40+
await super()._setup()
41+
operation = self.database_operation_factory.build_create()
42+
await self.execute_on_database(operation)
43+
3944
async def _store(self, execution: SagaExecution) -> None:
4045
operation = self.database_operation_factory.build_store(**execution.raw)
4146
await self.execute_on_database(operation)

packages/core/minos-microservice-saga/minos/saga/executions/saga.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,19 @@ def __init__(
6969
*args,
7070
**kwargs,
7171
):
72+
"""Initialize a Saga Execution.
73+
74+
:param definition: The ``Saga`` definition.
75+
:param uuid: The identifier of the execution.
76+
:param context: The execution context.
77+
:param status: The status of the execution.
78+
:param steps: The executed steps of the execution.
79+
:param paused_step: The paused step of the execution.
80+
:param already_rollback: ``True`` if already rollback of ``False`` otherwise.
81+
:param user: The user that launched the execution.
82+
:param args: Additional positional arguments.
83+
:param kwargs: Additional named arguments.
84+
"""
7285
definition.validate() # If not valid, raises an exception.
7386

7487
if steps is None:
@@ -94,7 +107,7 @@ def from_raw(cls, raw: Union[dict[str, Any], SagaExecution], **kwargs) -> SagaEx
94107
if isinstance(raw, cls):
95108
return raw
96109

97-
raw = raw.copy()
110+
raw = dict(raw)
98111

99112
current = raw | kwargs
100113
current["definition"] = Saga.from_raw(current["definition"])
@@ -313,8 +326,8 @@ def raw(self) -> dict[str, Any]:
313326
:return: A ``dict`` instance.
314327
"""
315328
return {
316-
"definition": self.definition.raw,
317329
"uuid": str(self.uuid),
330+
"definition": self.definition.raw,
318331
"status": self.status.raw,
319332
"executed_steps": [step.raw for step in self.executed_steps],
320333
"paused_step": None if self.paused_step is None else self.paused_step.raw,

packages/core/minos-microservice-saga/minos/saga/testing.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
class MockedSagaExecutionDatabaseOperationFactory(SagaExecutionDatabaseOperationFactory):
3636
"""For testing purposes"""
3737

38+
def build_create(self) -> DatabaseOperation:
39+
"""For testing purposes"""
40+
return MockedDatabaseOperation("create_table")
41+
3842
def build_store(self, uuid: UUID, **kwargs) -> DatabaseOperation:
3943
"""For testing purposes"""
4044
return MockedDatabaseOperation("create_table")

packages/core/minos-microservice-saga/tests/test_saga/test_executions/test_repositories/test_database/test_factories.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ def test_abstract(self):
1616
self.assertTrue(issubclass(SagaExecutionDatabaseOperationFactory, (DatabaseOperationFactory, ABC)))
1717
# noinspection PyUnresolvedReferences
1818
self.assertEqual(
19-
{"build_store", "build_load", "build_delete"}, SagaExecutionDatabaseOperationFactory.__abstractmethods__
19+
{"build_create", "build_store", "build_load", "build_delete"},
20+
SagaExecutionDatabaseOperationFactory.__abstractmethods__,
2021
)
2122

2223

packages/core/minos-microservice-saga/tests/test_saga/test_executions/test_saga/test_raw.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import unittest
2+
import uuid
23
from unittest.mock import (
34
MagicMock,
45
patch,
@@ -29,19 +30,19 @@ def setUp(self) -> None:
2930
self.broker_publisher.send = self.publish_mock
3031

3132
def test_from_raw(self):
32-
with patch("uuid.uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
33+
with patch.object(uuid, "uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
3334
expected = SagaExecution.from_definition(ADD_ORDER, user=self.user)
3435
observed = SagaExecution.from_raw(expected)
3536
self.assertEqual(expected, observed)
3637

3738
def test_from_raw_without_user(self):
38-
with patch("uuid.uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
39+
with patch.object(uuid, "uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
3940
expected = SagaExecution.from_definition(ADD_ORDER)
4041
observed = SagaExecution.from_raw(expected)
4142
self.assertEqual(expected, observed)
4243

4344
def test_created(self):
44-
with patch("uuid.uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
45+
with patch.object(uuid, "uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
4546
execution = SagaExecution.from_definition(ADD_ORDER, user=self.user)
4647

4748
expected = {
@@ -87,7 +88,7 @@ def test_created(self):
8788
self.assertEqual(expected, observed)
8889

8990
def test_created_without_user(self):
90-
with patch("uuid.uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
91+
with patch.object(uuid, "uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
9192
execution = SagaExecution.from_definition(ADD_ORDER)
9293

9394
expected = {
@@ -182,7 +183,7 @@ async def test_partial_step(self):
182183
"uuid": "a74d9d6d-290a-492e-afcc-70607958f65d",
183184
}
184185

185-
with patch("uuid.uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
186+
with patch.object(uuid, "uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
186187
expected = SagaExecution.from_definition(ADD_ORDER, user=self.user)
187188
with self.assertRaises(SagaPausedExecutionStepException):
188189
await expected.execute()
@@ -268,7 +269,7 @@ async def test_executed_step(self):
268269
"uuid": "a74d9d6d-290a-492e-afcc-70607958f65d",
269270
}
270271

271-
with patch("uuid.uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
272+
with patch.object(uuid, "uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
272273
expected = SagaExecution.from_definition(ADD_ORDER, user=self.user)
273274
with self.assertRaises(SagaPausedExecutionStepException):
274275
await expected.execute()

packages/plugins/minos-database-aiopg/minos/plugins/aiopg/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
AiopgDeltaDatabaseOperationFactory,
1717
AiopgLockDatabaseOperationFactory,
1818
AiopgManagementDatabaseOperationFactory,
19+
AiopgSagaExecutionDatabaseOperationFactory,
1920
AiopgSnapshotDatabaseOperationFactory,
2021
AiopgSnapshotQueryDatabaseOperationBuilder,
2122
AiopgTransactionDatabaseOperationFactory,

packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
OperationalError,
2828
ProgrammingError,
2929
)
30+
from psycopg2.extras import (
31+
DictCursor,
32+
)
3033

3134
from minos.common import (
3235
CircuitBreakerMixin,
@@ -180,7 +183,7 @@ async def _execute_cursor(self, operation: str, parameters: dict):
180183
if not await self.is_connected():
181184
await self.recreate()
182185

183-
self._cursor = await self._connection.cursor(timeout=self._cursor_timeout)
186+
self._cursor = await self._connection.cursor(timeout=self._cursor_timeout, cursor_factory=DictCursor)
184187
try:
185188
await self._cursor.execute(operation=operation, parameters=parameters)
186189
except OperationalError as exc:

packages/plugins/minos-database-aiopg/minos/plugins/aiopg/factories/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
AiopgBrokerSubscriberDuplicateValidatorDatabaseOperationFactory,
1515
AiopgBrokerSubscriberQueueDatabaseOperationFactory,
1616
)
17+
from .saga import (
18+
AiopgSagaExecutionDatabaseOperationFactory,
19+
)
1720
from .transactions import (
1821
AiopgTransactionDatabaseOperationFactory,
1922
)

0 commit comments

Comments
 (0)