Skip to content

Commit 4b23aa5

Browse files
author
Sergio García Prado
committed
ISSUE #51
* Add `build_create` method to the `SagaExecutionDatabaseOperationFactory` interface.
1 parent a282dca commit 4b23aa5

File tree

11 files changed

+325
-78
lines changed

11 files changed

+325
-78
lines changed

packages/core/minos-microservice-saga/minos/saga/executions/saga.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def __init__(
8686
self.user = user
8787

8888
@classmethod
89-
def from_raw(cls, raw: Union[dict[str, Any], SagaExecution], **kwargs) -> SagaExecution:
89+
def from_raw(cls, raw: Union[dict[str, Any], Iterable[Any], SagaExecution], **kwargs) -> SagaExecution:
9090
"""Build a new instance from a raw representation.
9191
9292
:param raw: The raw representation of the instance.
@@ -96,7 +96,20 @@ def from_raw(cls, raw: Union[dict[str, Any], SagaExecution], **kwargs) -> SagaEx
9696
if isinstance(raw, cls):
9797
return raw
9898

99-
raw = raw.copy()
99+
if isinstance(raw, dict):
100+
raw = raw.copy()
101+
else:
102+
keys = [
103+
"uuid",
104+
"definition",
105+
"status",
106+
"executed_steps",
107+
"paused_step",
108+
"context",
109+
"already_rollback",
110+
"user",
111+
]
112+
raw = dict(zip(keys, raw))
100113

101114
current = raw | kwargs
102115
current["definition"] = Saga.from_raw(current["definition"])
@@ -315,8 +328,8 @@ def raw(self) -> dict[str, Any]:
315328
:return: A ``dict`` instance.
316329
"""
317330
return {
318-
"definition": self.definition.raw,
319331
"uuid": str(self.uuid),
332+
"definition": self.definition.raw,
320333
"status": self.status.raw,
321334
"executed_steps": [step.raw for step in self.executed_steps],
322335
"paused_step": None if self.paused_step is None else self.paused_step.raw,

packages/core/minos-microservice-saga/tests/test_saga/test_executions/test_saga/test_raw.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import unittest
2+
import uuid
23
from unittest.mock import (
34
MagicMock,
45
patch,
@@ -29,19 +30,25 @@ def setUp(self) -> None:
2930
self.broker_publisher.send = self.publish_mock
3031

3132
def test_from_raw(self):
32-
with patch("uuid.uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
33+
with patch.object(uuid, "uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
3334
expected = SagaExecution.from_definition(ADD_ORDER, user=self.user)
3435
observed = SagaExecution.from_raw(expected)
3536
self.assertEqual(expected, observed)
3637

38+
def test_from_raw_iterable(self):
39+
with patch.object(uuid, "uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
40+
expected = SagaExecution.from_definition(ADD_ORDER, user=self.user)
41+
observed = SagaExecution.from_raw(expected.raw.values())
42+
self.assertEqual(expected, observed)
43+
3744
def test_from_raw_without_user(self):
38-
with patch("uuid.uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
45+
with patch.object(uuid, "uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
3946
expected = SagaExecution.from_definition(ADD_ORDER)
4047
observed = SagaExecution.from_raw(expected)
4148
self.assertEqual(expected, observed)
4249

4350
def test_created(self):
44-
with patch("uuid.uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
51+
with patch.object(uuid, "uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
4552
execution = SagaExecution.from_definition(ADD_ORDER, user=self.user)
4653

4754
expected = {
@@ -87,7 +94,7 @@ def test_created(self):
8794
self.assertEqual(expected, observed)
8895

8996
def test_created_without_user(self):
90-
with patch("uuid.uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
97+
with patch.object(uuid, "uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
9198
execution = SagaExecution.from_definition(ADD_ORDER)
9299

93100
expected = {
@@ -182,7 +189,7 @@ async def test_partial_step(self):
182189
"uuid": "a74d9d6d-290a-492e-afcc-70607958f65d",
183190
}
184191

185-
with patch("uuid.uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
192+
with patch.object(uuid, "uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
186193
expected = SagaExecution.from_definition(ADD_ORDER, user=self.user)
187194
with self.assertRaises(SagaPausedExecutionStepException):
188195
await expected.execute()
@@ -268,7 +275,7 @@ async def test_executed_step(self):
268275
"uuid": "a74d9d6d-290a-492e-afcc-70607958f65d",
269276
}
270277

271-
with patch("uuid.uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
278+
with patch.object(uuid, "uuid4", return_value=UUID("a74d9d6d-290a-492e-afcc-70607958f65d")):
272279
expected = SagaExecution.from_definition(ADD_ORDER, user=self.user)
273280
with self.assertRaises(SagaPausedExecutionStepException):
274281
await expected.execute()

packages/plugins/minos-database-aiopg/minos/plugins/aiopg/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
AiopgDeltaDatabaseOperationFactory,
1717
AiopgLockDatabaseOperationFactory,
1818
AiopgManagementDatabaseOperationFactory,
19+
AiopgSagaExecutionDatabaseOperationFactory,
1920
AiopgSnapshotDatabaseOperationFactory,
2021
AiopgSnapshotQueryDatabaseOperationBuilder,
2122
AiopgTransactionDatabaseOperationFactory,

packages/plugins/minos-database-aiopg/minos/plugins/aiopg/factories/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
AiopgBrokerSubscriberDuplicateValidatorDatabaseOperationFactory,
1515
AiopgBrokerSubscriberQueueDatabaseOperationFactory,
1616
)
17+
from .saga import (
18+
AiopgSagaExecutionDatabaseOperationFactory,
19+
)
1720
from .transactions import (
1821
AiopgTransactionDatabaseOperationFactory,
1922
)
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
from __future__ import (
2+
annotations,
3+
)
4+
5+
import json
6+
from uuid import (
7+
UUID,
8+
)
9+
10+
from psycopg2.sql import (
11+
SQL,
12+
)
13+
14+
from minos.common import (
15+
ComposedDatabaseOperation,
16+
DatabaseOperation,
17+
)
18+
from minos.saga import (
19+
SagaExecutionDatabaseOperationFactory,
20+
)
21+
22+
from ..clients import (
23+
AiopgDatabaseClient,
24+
)
25+
from ..operations import (
26+
AiopgDatabaseOperation,
27+
)
28+
29+
30+
# noinspection SqlNoDataSourceInspection,SqlResolve,PyMethodMayBeStatic,SqlDialectInspection
31+
class AiopgSagaExecutionDatabaseOperationFactory(SagaExecutionDatabaseOperationFactory):
32+
"""Aiopg Saga Execution Database Operation Factory class."""
33+
34+
def build_table_name(self) -> str:
35+
"""Get the table name.
36+
37+
:return: A ``str`` value.
38+
"""
39+
return "saga_execution"
40+
41+
def build_create(self) -> DatabaseOperation:
42+
"""Build the database operation to create the delta table.
43+
44+
:return: A ``DatabaseOperation`` instance.s
45+
"""
46+
return ComposedDatabaseOperation(
47+
[
48+
AiopgDatabaseOperation(
49+
'CREATE EXTENSION IF NOT EXISTS "uuid-ossp";',
50+
lock="uuid-ossp",
51+
),
52+
AiopgDatabaseOperation(
53+
"""
54+
DO
55+
$$
56+
BEGIN
57+
IF NOT EXISTS(SELECT *
58+
FROM pg_type typ
59+
INNER JOIN pg_namespace nsp
60+
ON nsp.oid = typ.typnamespace
61+
WHERE nsp.nspname = current_schema()
62+
AND typ.typname = 'saga_execution_status_type') THEN
63+
CREATE TYPE saga_execution_status_type AS ENUM (
64+
'created', 'running', 'paused', 'finished', 'errored'
65+
);
66+
END IF;
67+
END;
68+
$$
69+
LANGUAGE plpgsql;
70+
""",
71+
lock=self.build_table_name(),
72+
),
73+
AiopgDatabaseOperation(
74+
f"""
75+
CREATE TABLE IF NOT EXISTS {self.build_table_name()} (
76+
"uuid" UUID PRIMARY KEY,
77+
"definition" JSONB NOT NULL,
78+
"status" saga_execution_status_type NOT NULL,
79+
"executed_steps" JSONB NOT NULL,
80+
"paused_step" JSONB NOT NULL,
81+
"context" TEXT NOT NULL,
82+
"already_rollback" BOOL NOT NULL,
83+
"user" UUID
84+
);
85+
""",
86+
lock=self.build_table_name(),
87+
),
88+
]
89+
)
90+
91+
def build_store(self, uuid: UUID, **kwargs) -> DatabaseOperation:
92+
"""Build the database operation to store a saga execution.
93+
94+
:param uuid: The identifier of the saga execution.
95+
:param kwargs: The attributes of the saga execution.
96+
:return: A ``DatabaseOperation`` instance.
97+
"""
98+
query = SQL(
99+
f"""
100+
INSERT INTO {self.build_table_name()} (
101+
"uuid", "definition", "status", "executed_steps", "paused_step", "context", "already_rollback", "user"
102+
)
103+
VALUES (
104+
%(uuid)s,
105+
%(definition)s,
106+
%(status)s,
107+
%(executed_steps)s,
108+
%(paused_step)s,
109+
%(context)s,
110+
%(already_rollback)s,
111+
%(user)s
112+
)
113+
ON CONFLICT (uuid)
114+
DO
115+
UPDATE SET
116+
"definition" = %(definition)s,
117+
"status" = %(status)s,
118+
"executed_steps" = %(executed_steps)s,
119+
"paused_step" = %(paused_step)s,
120+
"context" = %(context)s,
121+
"already_rollback" = %(already_rollback)s,
122+
"user" = %(user)s
123+
;
124+
"""
125+
)
126+
# FIXME
127+
kwargs["definition"] = json.dumps(kwargs["definition"])
128+
kwargs["executed_steps"] = json.dumps(kwargs["executed_steps"])
129+
kwargs["paused_step"] = json.dumps(kwargs["paused_step"])
130+
return AiopgDatabaseOperation(query, {"uuid": uuid} | kwargs)
131+
132+
def build_load(self, uuid: UUID) -> DatabaseOperation:
133+
"""Build the database operation to load a saga execution.
134+
135+
:param uuid: The identifier of the saga execution.
136+
:return: A ``DatabaseOperation`` instance.
137+
"""
138+
return AiopgDatabaseOperation(
139+
SQL(f"SELECT * FROM {self.build_table_name()} WHERE uuid = %(uuid)s"), {"uuid": uuid}
140+
)
141+
142+
def build_delete(self, uuid: UUID) -> DatabaseOperation:
143+
"""Build the database operation to delete a saga execution.
144+
145+
:param uuid: The identifier of the saga execution.
146+
:return: A ``DatabaseOperation`` instance.
147+
"""
148+
return AiopgDatabaseOperation(
149+
SQL(f"DELETE FROM {self.build_table_name()} WHERE uuid = %(uuid)s"),
150+
{"uuid": uuid},
151+
)
152+
153+
154+
AiopgDatabaseClient.set_factory(SagaExecutionDatabaseOperationFactory, AiopgSagaExecutionDatabaseOperationFactory)

0 commit comments

Comments
 (0)