Skip to content

Commit 54774ca

Browse files
author
Sergio García Prado
authored
Merge pull request #368 from minos-framework/issue-367-add-database-client
#367 - Add `DatabaseClient`
2 parents 38e03ed + 889f7fa commit 54774ca

File tree

47 files changed

+1064
-533
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1064
-533
lines changed

packages/core/minos-microservice-aggregate/minos/aggregate/events/repositories/pg.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@
1010
UUID,
1111
)
1212

13-
from psycopg2 import (
14-
IntegrityError,
15-
)
1613
from psycopg2.sql import (
1714
SQL,
1815
Composable,
@@ -24,6 +21,7 @@
2421
NULL_UUID,
2522
Config,
2623
DatabaseMixin,
24+
IntegrityException,
2725
)
2826

2927
from ...exceptions import (
@@ -42,7 +40,7 @@ class PostgreSqlEventRepository(DatabaseMixin, EventRepository):
4240

4341
@classmethod
4442
def _from_config(cls, config: Config, **kwargs) -> Optional[EventRepository]:
45-
return super()._from_config(config, **config.get_database_by_name("event"), **kwargs)
43+
return super()._from_config(config, database_key=None, **kwargs)
4644

4745
async def _setup(self):
4846
"""Setup miscellaneous repository thing.
@@ -65,7 +63,7 @@ async def _submit(self, entry: EventEntry, **kwargs) -> EventEntry:
6563

6664
try:
6765
response = await self.submit_query_and_fetchone(query, params, lock=lock)
68-
except IntegrityError:
66+
except IntegrityException:
6967
raise EventRepositoryConflictException(
7068
f"{entry!r} could not be submitted due to a key (uuid, version, transaction) collision",
7169
await self.offset,

packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/abc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class PostgreSqlSnapshotSetup(DatabaseMixin):
1818

1919
@classmethod
2020
def _from_config(cls: Type[T], config: Config, **kwargs) -> T:
21-
return cls(**config.get_database_by_name("snapshot"), **kwargs)
21+
return cls(database_key=None, **kwargs)
2222

2323
async def _setup(self) -> None:
2424
await self.submit_query('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";', lock="uuid-ossp")

packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def _from_config(cls, config: Config, **kwargs) -> PostgreSqlSnapshotRepository:
5151
if "writer" not in kwargs:
5252
kwargs["writer"] = PostgreSqlSnapshotWriter.from_config(config, **kwargs)
5353

54-
return cls(**config.get_database_by_name("snapshot"), **kwargs)
54+
return cls(database_key=None, **kwargs)
5555

5656
async def _setup(self) -> None:
5757
await self.writer.setup()

packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/readers.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,10 @@ async def find_entries(
128128
qb = PostgreSqlSnapshotQueryBuilder(name, condition, ordering, limit, transaction_uuids, exclude_deleted)
129129
query, parameters = qb.build()
130130

131-
async with self.cursor() as cursor:
132-
# noinspection PyTypeChecker
133-
await cursor.execute(query, parameters)
134-
if streaming_mode:
135-
async for row in cursor:
136-
# noinspection PyArgumentList
137-
yield SnapshotEntry(*row)
138-
return
139-
else:
140-
rows = await cursor.fetchall()
141-
for row in rows:
142-
yield SnapshotEntry(*row)
131+
async_iterable = self.submit_query_and_iter(query, parameters)
132+
if streaming_mode:
133+
async for row in async_iterable:
134+
yield SnapshotEntry(*row)
135+
else:
136+
for row in [row async for row in async_iterable]:
137+
yield SnapshotEntry(*row)

packages/core/minos-microservice-aggregate/minos/aggregate/transactions/repositories/pg.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class PostgreSqlTransactionRepository(DatabaseMixin, TransactionRepository):
3434

3535
@classmethod
3636
def _from_config(cls, config: Config, **kwargs) -> Optional[PostgreSqlTransactionRepository]:
37-
return super()._from_config(config, **config.get_database_by_name("transaction"), **kwargs)
37+
return super()._from_config(config, database_key=None, **kwargs)
3838

3939
async def _setup(self):
4040
await self.submit_query('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";', lock="uuid-ossp")

packages/core/minos-microservice-aggregate/tests/test_aggregate/test_events/test_repositories/test_pg.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import unittest
22

3-
import aiopg
4-
53
from minos.aggregate import (
64
EventRepository,
75
PostgreSqlEventRepository,
86
)
97
from minos.common import (
8+
AiopgDatabaseClient,
109
DatabaseClientPool,
1110
)
1211
from minos.common.testing import (
@@ -18,6 +17,7 @@
1817
)
1918

2019

20+
# noinspection SqlNoDataSourceInspection
2121
class TestPostgreSqlEventRepositorySubmit(EventRepositorySubmitTestCase, PostgresAsyncTestCase):
2222
__test__ = True
2323

@@ -30,20 +30,18 @@ def test_constructor(self):
3030
pool = DatabaseClientPool.from_config(self.config)
3131
repository = PostgreSqlEventRepository(pool)
3232
self.assertIsInstance(repository, PostgreSqlEventRepository)
33-
self.assertEqual(pool, repository.pool)
33+
self.assertIsInstance(repository.pool, DatabaseClientPool)
3434

3535
def test_from_config(self):
3636
repository = PostgreSqlEventRepository.from_config(self.config)
37-
repository_config = self.config.get_database_by_name("event")
38-
self.assertEqual(repository_config["database"], repository.pool.database)
37+
self.assertIsInstance(repository.pool, DatabaseClientPool)
3938

4039
async def test_setup(self):
41-
async with aiopg.connect(**self.config.get_default_database()) as connection:
42-
async with connection.cursor() as cursor:
43-
await cursor.execute(
44-
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'aggregate_event');"
45-
)
46-
response = (await cursor.fetchone())[0]
40+
async with AiopgDatabaseClient(**self.config.get_default_database()) as client:
41+
await client.execute(
42+
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'aggregate_event');"
43+
)
44+
response = (await client.fetch_one())[0]
4745
self.assertTrue(response)
4846

4947

packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_abc.py

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import unittest
22

3-
import aiopg
4-
53
from minos.aggregate import (
64
PostgreSqlSnapshotSetup,
75
)
6+
from minos.common import (
7+
AiopgDatabaseClient,
8+
)
89
from minos.common.testing import (
910
PostgresAsyncTestCase,
1011
)
@@ -13,27 +14,25 @@
1314
)
1415

1516

17+
# noinspection SqlNoDataSourceInspection
1618
class TestPostgreSqlSnapshotSetup(AggregateTestCase, PostgresAsyncTestCase):
1719
async def test_setup_snapshot_table(self):
1820
async with PostgreSqlSnapshotSetup.from_config(self.config):
19-
async with aiopg.connect(**self.config.get_default_database()) as connection:
20-
async with connection.cursor() as cursor:
21-
await cursor.execute(
22-
"SELECT EXISTS (SELECT FROM pg_tables "
23-
"WHERE schemaname = 'public' AND tablename = 'snapshot');"
24-
)
25-
observed = (await cursor.fetchone())[0]
21+
async with AiopgDatabaseClient(**self.config.get_default_database()) as client:
22+
await client.execute(
23+
"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = 'snapshot');"
24+
)
25+
observed = (await client.fetch_one())[0]
2626
self.assertEqual(True, observed)
2727

2828
async def test_setup_snapshot_aux_offset_table(self):
2929
async with PostgreSqlSnapshotSetup.from_config(self.config):
30-
async with aiopg.connect(**self.config.get_default_database()) as connection:
31-
async with connection.cursor() as cursor:
32-
await cursor.execute(
33-
"SELECT EXISTS (SELECT FROM pg_tables WHERE "
34-
"schemaname = 'public' AND tablename = 'snapshot_aux_offset');"
35-
)
36-
observed = (await cursor.fetchone())[0]
30+
async with AiopgDatabaseClient(**self.config.get_default_database()) as client:
31+
await client.execute(
32+
"SELECT EXISTS (SELECT FROM pg_tables WHERE "
33+
"schemaname = 'public' AND tablename = 'snapshot_aux_offset');"
34+
)
35+
observed = (await client.fetch_one())[0]
3736
self.assertEqual(True, observed)
3837

3938

packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_queries.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
uuid4,
88
)
99

10-
import aiopg
1110
from psycopg2.extras import (
1211
Json,
1312
)
@@ -29,6 +28,7 @@
2928
)
3029
from minos.common import (
3130
NULL_UUID,
31+
AiopgDatabaseClient,
3232
)
3333
from minos.common.testing import (
3434
PostgresAsyncTestCase,
@@ -426,8 +426,8 @@ async def test_build_complex(self):
426426
self.assertEqual(self._flatten_parameters(expected_parameters), self._flatten_parameters(observed[1]))
427427

428428
async def _flatten_query(self, query) -> str:
429-
async with aiopg.connect(**self.config.get_default_database()) as connection:
430-
return query.as_string(connection.raw)
429+
async with AiopgDatabaseClient(**self.config.get_default_database()) as client:
430+
return query.as_string(client.connection.raw)
431431

432432
@staticmethod
433433
def _flatten_parameters(parameters) -> dict:

packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_readers.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
TransactionEntry,
2222
TransactionStatus,
2323
)
24+
from minos.common import (
25+
DatabaseClientPool,
26+
)
2427
from minos.common.testing import (
2528
PostgresAsyncTestCase,
2629
)
@@ -94,8 +97,7 @@ def test_type(self):
9497

9598
def test_from_config(self):
9699
reader = PostgreSqlSnapshotReader.from_config(self.config)
97-
snapshot_config = self.config.get_database_by_name("snapshot")
98-
self.assertEqual(snapshot_config["database"], reader.pool.database)
100+
self.assertIsInstance(reader.pool, DatabaseClientPool)
99101

100102
async def test_find_by_uuid(self):
101103
condition = Condition.IN("uuid", [self.uuid_2, self.uuid_3])

packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_writers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
TransactionStatus,
2727
)
2828
from minos.common import (
29+
DatabaseClientPool,
2930
NotProvidedException,
3031
current_datetime,
3132
)
@@ -101,8 +102,7 @@ def test_type(self):
101102
self.assertTrue(issubclass(PostgreSqlSnapshotWriter, PostgreSqlSnapshotSetup))
102103

103104
def test_from_config(self):
104-
snapshot_config = self.config.get_database_by_name("snapshot")
105-
self.assertEqual(snapshot_config["database"], self.writer.pool.database)
105+
self.assertIsInstance(self.writer.pool, DatabaseClientPool)
106106

107107
def test_from_config_raises(self):
108108
with self.assertRaises(NotProvidedException):

0 commit comments

Comments
 (0)