Skip to content

Commit f990278

Browse files
author
Sergio García Prado
committed
ISSUE #98
* Rename `submit_query` as `execute_on_database`. * Rename `submit_query_and_fetchone` as `execute_on_database_and_fetch_one`. * Rename `submit_query_and_fetchall` as `execute_on_database_and_fetch_all`.
1 parent 9649f2e commit f990278

File tree

12 files changed

+85
-83
lines changed
  • packages/core
    • minos-microservice-aggregate/minos/aggregate
      • events/repositories/database
      • snapshots/repositories/database
      • transactions/repositories/database
    • minos-microservice-common
    • minos-microservice-networks

12 files changed

+85
-83
lines changed

packages/core/minos-microservice-aggregate/minos/aggregate/events/repositories/database/impl.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@ async def _setup(self):
4141
4242
:return: This method does not return anything.
4343
"""
44-
operation = self.operation_factory.build_create()
45-
await self.submit_query(operation)
44+
operation = self.database_operation_factory.build_create()
45+
await self.execute_on_database(operation)
4646

4747
async def _submit(self, entry: EventEntry, **kwargs) -> EventEntry:
4848
operation = await self._build_submit_operation(entry)
4949

5050
try:
51-
response = await self.submit_query_and_fetchone(operation)
51+
response = await self.execute_on_database_and_fetch_one(operation)
5252
except IntegrityException:
5353
raise EventRepositoryConflictException(
5454
f"{entry!r} could not be submitted due to a key (uuid, version, transaction) collision",
@@ -69,15 +69,17 @@ async def _build_submit_operation(self, entry: EventEntry) -> DatabaseOperation:
6969
else:
7070
transaction_uuids = (NULL_UUID,)
7171

72-
return self.operation_factory.build_submit(transaction_uuids=transaction_uuids, **entry.as_raw(), lock=lock)
72+
return self.database_operation_factory.build_submit(
73+
transaction_uuids=transaction_uuids, **entry.as_raw(), lock=lock
74+
)
7375

7476
async def _select(self, streaming_mode: Optional[bool] = None, **kwargs) -> AsyncIterator[EventEntry]:
75-
operation = self.operation_factory.build_query(**kwargs)
76-
async for row in self.submit_query_and_iter(operation, streaming_mode=streaming_mode):
77+
operation = self.database_operation_factory.build_query(**kwargs)
78+
async for row in self.execute_on_database_and_fetch_all(operation, streaming_mode=streaming_mode):
7779
yield EventEntry(*row)
7880

7981
@property
8082
async def _offset(self) -> int:
81-
operation = self.operation_factory.build_query_offset()
82-
row = await self.submit_query_and_fetchone(operation)
83+
operation = self.database_operation_factory.build_query_offset()
84+
row = await self.execute_on_database_and_fetch_one(operation)
8385
return row[0] or 0

packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/repositories/database/impl.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ def __init__(
8787
self._transaction_repository = transaction_repository
8888

8989
async def _setup(self) -> None:
90-
operation = self.operation_factory.build_create()
91-
await self.submit_query(operation)
90+
operation = self.database_operation_factory.build_create()
91+
await self.execute_on_database(operation)
9292

9393
async def _destroy(self) -> None:
9494
await super()._destroy()
@@ -110,11 +110,11 @@ async def _find_entries(
110110
else:
111111
transaction_uuids = await transaction.uuids
112112

113-
operation = self.operation_factory.build_query(
113+
operation = self.database_operation_factory.build_query(
114114
name, condition, ordering, limit, transaction_uuids, exclude_deleted
115115
)
116116

117-
async for row in self.submit_query_and_iter(operation, streaming_mode=streaming_mode):
117+
async for row in self.execute_on_database_and_fetch_all(operation, streaming_mode=streaming_mode):
118118
yield SnapshotEntry(*row)
119119

120120
async def is_synced(self, name: str, **kwargs) -> bool:
@@ -148,17 +148,17 @@ async def _synchronize(self, **kwargs) -> None:
148148
await self._store_offset(offset)
149149

150150
async def _load_offset(self) -> int:
151-
operation = self.operation_factory.build_query_offset()
151+
operation = self.database_operation_factory.build_query_offset()
152152
# noinspection PyBroadException
153153
try:
154-
row = await self.submit_query_and_fetchone(operation)
154+
row = await self.execute_on_database_and_fetch_one(operation)
155155
except ProgrammingException:
156156
return 0
157157
return row[0]
158158

159159
async def _store_offset(self, offset: int) -> None:
160-
operation = self.operation_factory.build_submit_offset(offset)
161-
await self.submit_query(operation)
160+
operation = self.database_operation_factory.build_submit_offset(offset)
161+
await self.execute_on_database(operation)
162162

163163
async def _dispatch_one(self, event_entry: EventEntry, **kwargs) -> SnapshotEntry:
164164
if event_entry.action.is_delete:
@@ -210,8 +210,8 @@ async def _select_one_instance(self, name: str, uuid: UUID, **kwargs) -> RootEnt
210210
return snapshot_entry.build(**kwargs)
211211

212212
async def _submit_entry(self, snapshot_entry: SnapshotEntry) -> SnapshotEntry:
213-
operation = self.operation_factory.build_submit(**snapshot_entry.as_raw())
214-
response = await self.submit_query_and_fetchone(operation)
213+
operation = self.database_operation_factory.build_submit(**snapshot_entry.as_raw())
214+
response = await self.execute_on_database_and_fetch_one(operation)
215215

216216
snapshot_entry.created_at, snapshot_entry.updated_at = response
217217

@@ -223,5 +223,5 @@ async def _clean_transactions(self, offset: int, **kwargs) -> None:
223223
)
224224
transaction_uuids = {transaction.uuid async for transaction in iterable}
225225
if len(transaction_uuids):
226-
operation = self.operation_factory.build_delete(transaction_uuids)
227-
await self.submit_query(operation)
226+
operation = self.database_operation_factory.build_delete(transaction_uuids)
227+
await self.execute_on_database(operation)

packages/core/minos-microservice-aggregate/minos/aggregate/transactions/repositories/database/impl.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,16 @@ def __init__(self, *args, database_key: Optional[tuple[str]] = None, **kwargs):
3535
super().__init__(*args, database_key=database_key, **kwargs)
3636

3737
async def _setup(self):
38-
operation = self.operation_factory.build_create()
39-
await self.submit_query(operation)
38+
operation = self.database_operation_factory.build_create()
39+
await self.execute_on_database(operation)
4040

4141
async def _submit(self, transaction: TransactionEntry) -> TransactionEntry:
42-
operation = self.operation_factory.build_submit(
42+
operation = self.database_operation_factory.build_submit(
4343
**transaction.as_raw(),
4444
)
4545

4646
try:
47-
updated_at = await self.submit_query_and_fetchone(operation)
47+
updated_at = await self.execute_on_database_and_fetch_one(operation)
4848
except ProgrammingException:
4949
raise TransactionRepositoryConflictException(
5050
f"{transaction!r} status is invalid respect to the previous one."
@@ -53,6 +53,6 @@ async def _submit(self, transaction: TransactionEntry) -> TransactionEntry:
5353
return transaction
5454

5555
async def _select(self, streaming_mode: Optional[bool] = None, **kwargs) -> AsyncIterator[TransactionEntry]:
56-
operation = self.operation_factory.build_query(**kwargs)
57-
async for row in self.submit_query_and_iter(operation, streaming_mode=streaming_mode):
56+
operation = self.database_operation_factory.build_query(**kwargs)
57+
async for row in self.execute_on_database_and_fetch_all(operation, streaming_mode=streaming_mode):
5858
yield TransactionEntry(*row, transaction_repository=self)

packages/core/minos-microservice-common/minos/common/database/mixins.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def _get_pool_from_factory(pool_factory: PoolFactory, database_key: Optional[tup
7979
return pool_factory.get_pool(type_="database")
8080

8181
@property
82-
def operation_factory(self) -> Optional[GenericDatabaseOperationFactory]:
82+
def database_operation_factory(self) -> Optional[GenericDatabaseOperationFactory]:
8383
"""Get the operation factory if any.
8484
8585
:return: A ``OperationFactory`` if it has been set or ``None`` otherwise.
@@ -101,8 +101,8 @@ def _get_generic_operation_factory(self) -> Optional[type[GenericDatabaseOperati
101101
raise TypeError(f"{type(self)!r} must contain a {DatabaseOperationFactory!r} as generic value.")
102102
return operation_factory_cls
103103

104-
async def submit_query_and_fetchone(self, operation: DatabaseOperation) -> tuple:
105-
"""Submit a SQL query and gets the first response.
104+
async def execute_on_database_and_fetch_one(self, operation: DatabaseOperation) -> tuple:
105+
"""Submit an Operation and get the first response.
106106
107107
:param operation: The operation to be executed.
108108
:return: This method does not return anything.
@@ -112,10 +112,10 @@ async def submit_query_and_fetchone(self, operation: DatabaseOperation) -> tuple
112112
return await client.fetch_one()
113113

114114
# noinspection PyUnusedLocal
115-
async def submit_query_and_iter(
115+
async def execute_on_database_and_fetch_all(
116116
self, operation: DatabaseOperation, streaming_mode: Optional[bool] = None
117117
) -> AsyncIterator[tuple]:
118-
"""Submit a SQL query and return an asynchronous iterator.
118+
"""Submit an Operation and return an asynchronous iterator.
119119
120120
:param operation: The operation to be executed.
121121
:param streaming_mode: If ``True`` return the values in streaming directly from the database (keep an open
@@ -139,8 +139,8 @@ async def submit_query_and_iter(
139139
yield value
140140

141141
# noinspection PyUnusedLocal
142-
async def submit_query(self, operation: DatabaseOperation) -> None:
143-
"""Submit a SQL query.
142+
async def execute_on_database(self, operation: DatabaseOperation) -> None:
143+
"""Submit an Operation.
144144
145145
:param operation: The operation to be executed.
146146
:return: This method does not return anything.

packages/core/minos-microservice-common/tests/test_common/test_database/test_mixins.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -52,22 +52,22 @@ async def test_pool(self):
5252
async def test_operation_factory(self):
5353
operation_factory = MockedLockDatabaseOperationFactory()
5454
mixin = DatabaseMixin(operation_factory=operation_factory)
55-
self.assertEqual(operation_factory, mixin.operation_factory)
55+
self.assertEqual(operation_factory, mixin.database_operation_factory)
5656

5757
async def test_operation_factory_from_cls_init(self):
5858
mixin = DatabaseMixin(operation_factory_cls=LockDatabaseOperationFactory)
59-
self.assertIsInstance(mixin.operation_factory, MockedLockDatabaseOperationFactory)
59+
self.assertIsInstance(mixin.database_operation_factory, MockedLockDatabaseOperationFactory)
6060

6161
async def test_operation_factory_from_cls_generic(self):
6262
class _DatabaseMixin(DatabaseMixin[LockDatabaseOperationFactory]):
6363
"""For testing purposes."""
6464

6565
mixin = _DatabaseMixin()
66-
self.assertIsInstance(mixin.operation_factory, MockedLockDatabaseOperationFactory)
66+
self.assertIsInstance(mixin.database_operation_factory, MockedLockDatabaseOperationFactory)
6767

6868
async def test_operation_factory_none(self):
6969
mixin = DatabaseMixin()
70-
self.assertEqual(None, mixin.operation_factory)
70+
self.assertEqual(None, mixin.database_operation_factory)
7171

7272
async def test_operation_factory_from_cls_generic_raises(self):
7373
class _DatabaseMixin(DatabaseMixin[int]):
@@ -76,76 +76,76 @@ class _DatabaseMixin(DatabaseMixin[int]):
7676
with self.assertRaises(TypeError):
7777
_DatabaseMixin()
7878

79-
async def test_submit_query(self):
79+
async def test_execute_on_database(self):
8080
op1 = MockedDatabaseOperation("create_table")
8181
op2 = MockedDatabaseOperation("check_exist", [(True,)])
8282

8383
async with DatabaseMixin() as database:
84-
await database.submit_query(op1)
84+
await database.execute_on_database(op1)
8585

8686
async with MockedDatabaseClient(**self.config.get_default_database()) as client:
8787
await client.execute(op2)
8888
self.assertTrue((await client.fetch_one())[0])
8989

90-
async def test_submit_query_locked(self):
90+
async def test_execute_on_database_locked(self):
9191
op1 = MockedDatabaseOperation("create_table", lock=1234)
9292
op2 = MockedDatabaseOperation("check_exist", [(True,)])
9393

9494
async with DatabaseMixin() as database:
95-
await database.submit_query(op1)
95+
await database.execute_on_database(op1)
9696

9797
async with MockedDatabaseClient(**self.config.get_default_database()) as client:
9898
await client.execute(op2)
9999
self.assertTrue((await client.fetch_one())[0])
100100

101-
async def test_submit_query_and_fetchone(self):
101+
async def test_execute_on_database_and_fetch_one(self):
102102
op1 = MockedDatabaseOperation("create_table")
103103
op2 = MockedDatabaseOperation("insert")
104104
op3 = MockedDatabaseOperation("select", [(3,), (4,), (5,)])
105105

106106
async with DatabaseMixin() as database:
107-
await database.submit_query(op1)
108-
await database.submit_query(op2)
107+
await database.execute_on_database(op1)
108+
await database.execute_on_database(op2)
109109

110-
observed = await database.submit_query_and_fetchone(op3)
110+
observed = await database.execute_on_database_and_fetch_one(op3)
111111

112112
self.assertEqual((3,), observed)
113113

114-
async def test_submit_query_and_iter(self):
114+
async def test_execute_on_database_and_fetch_all(self):
115115
op1 = MockedDatabaseOperation("create_table")
116116
op2 = MockedDatabaseOperation("insert")
117117
op3 = MockedDatabaseOperation("select", [(3,), (4,), (5,)])
118118

119119
async with DatabaseMixin() as database:
120-
await database.submit_query(op1)
121-
await database.submit_query(op2)
122-
observed = [v async for v in database.submit_query_and_iter(op3)]
120+
await database.execute_on_database(op1)
121+
await database.execute_on_database(op2)
122+
observed = [v async for v in database.execute_on_database_and_fetch_all(op3)]
123123

124124
self.assertEqual([(3,), (4,), (5,)], observed)
125125

126-
async def test_submit_query_and_iter_streaming_mode_true(self):
126+
async def test_execute_on_database_and_fetch_all_streaming_mode_true(self):
127127
op1 = MockedDatabaseOperation("create_table")
128128
op2 = MockedDatabaseOperation("insert")
129129
op3 = MockedDatabaseOperation("select", [(3,), (4,), (5,)])
130130

131131
async with DatabaseMixin() as database:
132-
await database.submit_query(op1)
133-
await database.submit_query(op2)
132+
await database.execute_on_database(op1)
133+
await database.execute_on_database(op2)
134134

135-
observed = [v async for v in database.submit_query_and_iter(op3, streaming_mode=True)]
135+
observed = [v async for v in database.execute_on_database_and_fetch_all(op3, streaming_mode=True)]
136136

137137
self.assertEqual([(3,), (4,), (5,)], observed)
138138

139-
async def test_submit_query_and_iter_locked(self):
139+
async def test_execute_on_database_and_fetch_all_locked(self):
140140
op1 = MockedDatabaseOperation("create_table", lock=1234)
141141
op2 = MockedDatabaseOperation("insert")
142142
op3 = MockedDatabaseOperation("select", [(3,), (4,), (5,)])
143143

144144
async with DatabaseMixin() as database:
145-
await database.submit_query(op1)
146-
await database.submit_query(op2)
145+
await database.execute_on_database(op1)
146+
await database.execute_on_database(op2)
147147

148-
observed = [v async for v in database.submit_query_and_iter(op3)]
148+
observed = [v async for v in database.execute_on_database_and_fetch_all(op3)]
149149

150150
self.assertEqual([(3,), (4,), (5,)], observed)
151151

packages/core/minos-microservice-networks/minos/networks/brokers/collections/queues/database/impl.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ async def _destroy(self) -> None:
120120
await super()._destroy()
121121

122122
async def _create_table(self) -> None:
123-
operation = self.operation_factory.build_create()
124-
await self.submit_query(operation)
123+
operation = self.database_operation_factory.build_create()
124+
await self.execute_on_database(operation)
125125

126126
async def _start_run(self) -> None:
127127
if self._run_task is None:
@@ -142,13 +142,13 @@ async def _flush_queue(self):
142142
entry = self._queue.get_nowait()
143143
except QueueEmpty:
144144
break
145-
operation = self.operation_factory.build_mark_processed(entry.id_)
146-
await self.submit_query(operation)
145+
operation = self.database_operation_factory.build_mark_processed(entry.id_)
146+
await self.execute_on_database(operation)
147147
self._queue.task_done()
148148

149149
async def _enqueue(self, message: BrokerMessage) -> None:
150-
operation = self.operation_factory.build_submit(message.topic, message.avro_bytes)
151-
await self.submit_query(operation)
150+
operation = self.database_operation_factory.build_submit(message.topic, message.avro_bytes)
151+
await self.execute_on_database(operation)
152152
await self._notify_enqueued(message)
153153

154154
# noinspection PyUnusedLocal
@@ -166,12 +166,12 @@ async def _dequeue(self) -> BrokerMessage:
166166
logger.warning(
167167
f"There was a problem while trying to deserialize the entry with {entry.id_!r} id: {exc}"
168168
)
169-
operation = self.operation_factory.build_mark_processed(entry.id_)
170-
await self.submit_query(operation)
169+
operation = self.database_operation_factory.build_mark_processed(entry.id_)
170+
await self.execute_on_database(operation)
171171
continue
172172

173-
operation = self.operation_factory.build_delete(entry.id_)
174-
await self.submit_query(operation)
173+
operation = self.database_operation_factory.build_delete(entry.id_)
174+
await self.execute_on_database(operation)
175175
return message
176176
finally:
177177
self._queue.task_done()
@@ -195,8 +195,8 @@ async def _wait_enqueued(self) -> None:
195195

196196
async def _get_count(self) -> int:
197197
# noinspection PyTypeChecker
198-
operation = self.operation_factory.build_count(self.retry)
199-
row = await self.submit_query_and_fetchone(operation)
198+
operation = self.database_operation_factory.build_count(self.retry)
199+
row = await self.execute_on_database_and_fetch_one(operation)
200200
count = row[0]
201201
return count
202202

@@ -210,14 +210,14 @@ async def _dequeue_batch(self) -> None:
210210
entries = [_Entry(*row) for row in rows]
211211

212212
ids = tuple(entry.id_ for entry in entries)
213-
operation = self.operation_factory.build_mark_processing(ids)
213+
operation = self.database_operation_factory.build_mark_processing(ids)
214214
await client.execute(operation)
215215

216216
for entry in entries:
217217
await self._queue.put(entry)
218218

219219
async def _dequeue_rows(self, client: DatabaseClient) -> list[Any]:
220-
operation = self.operation_factory.build_query(self._retry, self._records)
220+
operation = self.database_operation_factory.build_query(self._retry, self._records)
221221
await client.execute(operation)
222222
return [row async for row in client.fetch_all()]
223223

packages/core/minos-microservice-networks/minos/networks/brokers/subscribers/filtered/validators/duplicates/database/impl.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ async def _setup(self) -> None:
4040
await self._create_table()
4141

4242
async def _create_table(self) -> None:
43-
operation = self.operation_factory.build_create()
44-
await self.submit_query(operation)
43+
operation = self.database_operation_factory.build_create()
44+
await self.execute_on_database(operation)
4545

4646
async def _is_unique(self, topic: str, uuid: UUID) -> bool:
47-
operation = self.operation_factory.build_submit(topic, uuid)
47+
operation = self.database_operation_factory.build_submit(topic, uuid)
4848
try:
49-
await self.submit_query(operation)
49+
await self.execute_on_database(operation)
5050
return True
5151
except IntegrityException:
5252
return False

0 commit comments

Comments
 (0)