Skip to content

Commit 3036e92

Browse files
author
Sergio García Prado
committed
ISSUE #98
* Increase coverage. * Minor improvements.
1 parent ba8d796 commit 3036e92

File tree

9 files changed

+343
-48
lines changed

9 files changed

+343
-48
lines changed

packages/core/minos-microservice-aggregate/minos/aggregate/events/repositories/database/factories/abc.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
ABC,
33
abstractmethod,
44
)
5+
from collections.abc import Iterable
56
from datetime import (
67
datetime,
78
)
@@ -35,7 +36,7 @@ def build_create_table(self) -> DatabaseOperation:
3536
@abstractmethod
3637
def build_submit_row(
3738
self,
38-
transaction_uuids: tuple[UUID],
39+
transaction_uuids: Iterable[UUID],
3940
uuid: UUID,
4041
action: Action,
4142
name: str,

packages/plugins/minos-database-aiopg/minos/plugins/aiopg/factories/aggregate/events.py

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from psycopg2.sql import (
1616
SQL,
1717
Composable,
18+
Identifier,
1819
Literal,
1920
Placeholder,
2021
)
@@ -40,6 +41,13 @@
4041
class AiopgEventDatabaseOperationFactory(EventDatabaseOperationFactory):
4142
"""Aiopg Event Database Operation Factory class."""
4243

44+
def build_table_name(self) -> str:
45+
"""Get the table name.
46+
47+
:return: A ``str`` value.
48+
"""
49+
return "aggregate_event"
50+
4351
def build_create_table(self) -> DatabaseOperation:
4452
"""Build the database operation to create the event table.
4553
@@ -68,11 +76,11 @@ def build_create_table(self) -> DatabaseOperation:
6876
$$
6977
LANGUAGE plpgsql;
7078
""",
71-
lock="aggregate_event",
79+
lock=self.build_table_name(),
7280
),
7381
AiopgDatabaseOperation(
74-
"""
75-
CREATE TABLE IF NOT EXISTS aggregate_event (
82+
f"""
83+
CREATE TABLE IF NOT EXISTS {self.build_table_name()} (
7684
id BIGSERIAL PRIMARY KEY,
7785
action ACTION_TYPE NOT NULL,
7886
uuid UUID NOT NULL,
@@ -84,14 +92,14 @@ def build_create_table(self) -> DatabaseOperation:
8492
UNIQUE (uuid, version, transaction_uuid)
8593
);
8694
""",
87-
lock="aggregate_event",
95+
lock=self.build_table_name(),
8896
),
8997
]
9098
)
9199

92100
def build_submit_row(
93101
self,
94-
transaction_uuids: tuple[UUID],
102+
transaction_uuids: Iterable[UUID],
95103
uuid: UUID,
96104
action: Action,
97105
name: str,
@@ -118,7 +126,7 @@ def build_submit_row(
118126
"""
119127
insert_values = SQL(
120128
"""
121-
INSERT INTO aggregate_event (id, action, uuid, name, version, data, created_at, transaction_uuid)
129+
INSERT INTO {table_name} (id, action, uuid, name, version, data, created_at, transaction_uuid)
122130
VALUES (
123131
default,
124132
%(action)s,
@@ -151,7 +159,7 @@ def build_submit_row(
151159

152160
from_sql, from_parameters = self._build_submit_from(transaction_uuids)
153161

154-
query = insert_values.format(from_parts=from_sql)
162+
query = insert_values.format(from_parts=from_sql, table_name=Identifier(self.build_table_name()))
155163
parameters = from_parameters | insert_parameters
156164

157165
return AiopgDatabaseOperation(query, parameters, lock)
@@ -160,7 +168,7 @@ def _build_submit_from(self, transaction_uuids: Iterable[UUID]) -> tuple[Composa
160168
select_transaction = SQL(
161169
"""
162170
SELECT {index} AS transaction_index, uuid, MAX(version) AS version
163-
FROM aggregate_event
171+
FROM {table_name}
164172
WHERE uuid = %(uuid)s AND transaction_uuid = {transaction_uuid}
165173
GROUP BY uuid
166174
"""
@@ -171,7 +179,13 @@ def _build_submit_from(self, transaction_uuids: Iterable[UUID]) -> tuple[Composa
171179
name = f"transaction_uuid_{index}"
172180
parameters[name] = transaction_uuid
173181

174-
from_query_parts.append(select_transaction.format(index=Literal(index), transaction_uuid=Placeholder(name)))
182+
from_query_parts.append(
183+
select_transaction.format(
184+
index=Literal(index),
185+
transaction_uuid=Placeholder(name),
186+
table_name=Identifier(self.build_table_name()),
187+
),
188+
)
175189

176190
query = SQL(" UNION ALL ").join(from_query_parts)
177191
return query, parameters
@@ -193,7 +207,7 @@ def build_select_rows(
193207
id_ge: Optional[int] = None,
194208
transaction_uuid: Optional[UUID] = None,
195209
transaction_uuid_ne: Optional[UUID] = None,
196-
transaction_uuid_in: Optional[tuple[UUID, ...]] = None,
210+
transaction_uuid_in: Optional[Iterable[UUID, ...]] = None,
197211
**kwargs,
198212
) -> DatabaseOperation:
199213
"""Build the database operation to select rows.
@@ -216,10 +230,12 @@ def build_select_rows(
216230
217231
:return: A ``DatabaseOperation`` instance.
218232
"""
233+
if transaction_uuid_in is not None:
234+
transaction_uuid_in = tuple(transaction_uuid_in)
219235

220-
_select_all = """
236+
_select_all = f"""
221237
SELECT uuid, name, version, data, id, action, created_at, transaction_uuid
222-
FROM aggregate_event
238+
FROM {self.build_table_name()}
223239
"""
224240

225241
conditions = list()
@@ -284,7 +300,7 @@ def build_select_max_id(self) -> DatabaseOperation:
284300
285301
:return: A ``DatabaseOperation`` instance.
286302
"""
287-
return AiopgDatabaseOperation("SELECT MAX(id) FROM aggregate_event;".strip())
303+
return AiopgDatabaseOperation(f"SELECT MAX(id) FROM {self.build_table_name()};".strip())
288304

289305

290306
AiopgDatabaseClient.register_factory(EventDatabaseOperationFactory, AiopgEventDatabaseOperationFactory)

packages/plugins/minos-database-aiopg/minos/plugins/aiopg/factories/aggregate/snapshots/impl.py

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,20 @@
3939
class AiopgSnapshotDatabaseOperationFactory(SnapshotDatabaseOperationFactory):
4040
"""Aiopg Snapshot Database Operation Factory class."""
4141

42+
def build_table_name(self) -> str:
43+
"""Get the table name.
44+
45+
:return: A ``str`` value.
46+
"""
47+
return "snapshot"
48+
49+
def build_offset_table_name(self) -> str:
50+
"""Get the offset table name.
51+
52+
:return: A ``str`` value.
53+
"""
54+
return "snapshot_aux_offset"
55+
4256
def build_create_table(self) -> DatabaseOperation:
4357
"""Build the database operation to create the snapshot table.
4458
@@ -51,8 +65,8 @@ def build_create_table(self) -> DatabaseOperation:
5165
lock="uuid-ossp",
5266
),
5367
AiopgDatabaseOperation(
54-
"""
55-
CREATE TABLE IF NOT EXISTS snapshot (
68+
f"""
69+
CREATE TABLE IF NOT EXISTS {self.build_table_name()} (
5670
uuid UUID NOT NULL,
5771
name TEXT NOT NULL,
5872
version INT NOT NULL,
@@ -64,17 +78,17 @@ def build_create_table(self) -> DatabaseOperation:
6478
PRIMARY KEY (uuid, transaction_uuid)
6579
);
6680
""",
67-
lock="snapshot",
81+
lock=self.build_table_name(),
6882
),
6983
AiopgDatabaseOperation(
70-
"""
71-
CREATE TABLE IF NOT EXISTS snapshot_aux_offset (
84+
f"""
85+
CREATE TABLE IF NOT EXISTS {self.build_offset_table_name()} (
7286
id bool PRIMARY KEY DEFAULT TRUE,
7387
value BIGINT NOT NULL,
7488
CONSTRAINT id_uni CHECK (id)
7589
);
7690
""",
77-
lock="snapshot_aux_offset",
91+
lock=self.build_offset_table_name(),
7892
),
7993
]
8094
)
@@ -86,8 +100,8 @@ def build_delete_by_transactions(self, transaction_uuids: Iterable[UUID]) -> Dat
86100
:return: A ``DatabaseOperation`` instance.
87101
"""
88102
return AiopgDatabaseOperation(
89-
"""
90-
DELETE FROM snapshot
103+
f"""
104+
DELETE FROM {self.build_table_name()}
91105
WHERE transaction_uuid IN %(transaction_uuids)s;
92106
""",
93107
{"transaction_uuids": tuple(transaction_uuids)},
@@ -118,8 +132,10 @@ def build_insert(
118132
"""
119133

120134
return AiopgDatabaseOperation(
121-
"""
122-
INSERT INTO snapshot (uuid, name, version, schema, data, created_at, updated_at, transaction_uuid)
135+
f"""
136+
INSERT INTO {self.build_table_name()} (
137+
uuid, name, version, schema, data, created_at, updated_at, transaction_uuid
138+
)
123139
VALUES (
124140
%(uuid)s,
125141
%(name)s,
@@ -153,7 +169,7 @@ def build_query(
153169
condition: _Condition,
154170
ordering: Optional[_Ordering],
155171
limit: Optional[int],
156-
transaction_uuids: tuple[UUID, ...],
172+
transaction_uuids: Iterable[UUID],
157173
exclude_deleted: bool,
158174
) -> DatabaseOperation:
159175
"""Build the query database operation.
@@ -172,7 +188,13 @@ def build_query(
172188
:return: A ``DatabaseOperation`` instance.
173189
"""
174190
builder = AiopgSnapshotQueryDatabaseOperationBuilder(
175-
name, condition, ordering, limit, transaction_uuids, exclude_deleted
191+
name=name,
192+
condition=condition,
193+
ordering=ordering,
194+
limit=limit,
195+
transaction_uuids=transaction_uuids,
196+
exclude_deleted=exclude_deleted,
197+
table_name=self.build_table_name(),
176198
)
177199
query, parameters = builder.build()
178200

@@ -185,14 +207,17 @@ def build_store_offset(self, value: int) -> DatabaseOperation:
185207
:return: A ``DatabaseOperation`` instance.
186208
"""
187209
return AiopgDatabaseOperation(
188-
"""
189-
INSERT INTO snapshot_aux_offset (id, value)
210+
f"""
211+
INSERT INTO {self.build_offset_table_name()} (id, value)
190212
VALUES (TRUE, %(value)s)
191213
ON CONFLICT (id)
192-
DO UPDATE SET value = GREATEST(%(value)s, (SELECT value FROM snapshot_aux_offset WHERE id = TRUE));
214+
DO UPDATE SET value = GREATEST(
215+
%(value)s,
216+
(SELECT value FROM {self.build_offset_table_name()} WHERE id = TRUE)
217+
);
193218
""".strip(),
194219
{"value": value},
195-
lock="insert_snapshot_aux_offset",
220+
lock=f"insert_{self.build_offset_table_name()}",
196221
)
197222

198223
def build_get_offset(self) -> DatabaseOperation:
@@ -201,9 +226,9 @@ def build_get_offset(self) -> DatabaseOperation:
201226
:return: A ``DatabaseOperation`` instance.
202227
"""
203228
return AiopgDatabaseOperation(
204-
"""
229+
f"""
205230
SELECT value
206-
FROM snapshot_aux_offset
231+
FROM {self.build_offset_table_name()}
207232
WHERE id = TRUE;
208233
"""
209234
)

packages/plugins/minos-database-aiopg/minos/plugins/aiopg/factories/aggregate/snapshots/queries.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from typing import (
66
Any,
7+
Iterable,
78
Optional,
89
)
910
from uuid import (
@@ -64,15 +65,21 @@ def __init__(
6465
condition: _Condition,
6566
ordering: Optional[_Ordering] = None,
6667
limit: Optional[int] = None,
67-
transaction_uuids: tuple[UUID, ...] = (NULL_UUID,),
68+
transaction_uuids: Iterable[UUID, ...] = (NULL_UUID,),
6869
exclude_deleted: bool = False,
70+
table_name: Optional[str] = None,
6971
):
72+
if not isinstance(transaction_uuids, tuple):
73+
transaction_uuids = tuple(transaction_uuids)
74+
if table_name is None:
75+
table_name = "snapshot"
7076
self.name = name
7177
self.condition = condition
7278
self.ordering = ordering
7379
self.limit = limit
7480
self.transaction_uuids = transaction_uuids
7581
self.exclude_deleted = exclude_deleted
82+
self.table_name = table_name
7683
self._parameters = None
7784

7885
def build(self) -> tuple[Composable, dict[str, Any]]:
@@ -116,7 +123,9 @@ def _build_select_from(self) -> Composable:
116123
self._parameters[name] = transaction_uuid
117124

118125
from_query_parts.append(
119-
self._SELECT_TRANSACTION_CHUNK.format(index=Literal(index), transaction_uuid=Placeholder(name))
126+
self._SELECT_TRANSACTION_CHUNK.format(
127+
index=Literal(index), transaction_uuid=Placeholder(name), table_name=Identifier(self.table_name)
128+
)
120129
)
121130

122131
from_query = SQL(" UNION ALL ").join(from_query_parts)
@@ -264,7 +273,7 @@ def generate_random_str() -> str:
264273

265274
_SELECT_TRANSACTION_CHUNK = SQL(
266275
"SELECT {index} AS transaction_index, * "
267-
"FROM snapshot "
276+
"FROM {table_name} "
268277
"WHERE name = %(name)s AND transaction_uuid = {transaction_uuid} "
269278
)
270279

0 commit comments

Comments
 (0)