Skip to content

Commit b90ddfe

Browse files
author
Sergio García Prado
committed
ISSUE #43
* Improve `AiopgDatabaseClient` adding support for connection recreation during breakages.
1 parent 6527dbb commit b90ddfe

File tree

8 files changed

+84
-49
lines changed

8 files changed

+84
-49
lines changed

packages/core/minos-microservice-common/minos/common/database/clients/abc.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,8 @@ async def is_valid(self, **kwargs) -> bool:
6666
"""
6767
return await self._is_valid(**kwargs)
6868

69-
@abstractmethod
7069
async def _is_valid(self, **kwargs) -> bool:
71-
raise NotImplementedError
70+
return True
7271

7372
async def _destroy(self) -> None:
7473
await self.reset()

packages/core/minos-microservice-common/minos/common/pools.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,15 @@ def _get_pool_cls(self, type_: str) -> type[Pool]:
112112

113113

114114
class _PoolBase(PoolBase, ABC):
115-
def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = 300, **kwargs):
115+
def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = None, **kwargs):
116116
super().__init__(maxsize=maxsize, recycle=recycle)
117117

118118

119119
class Pool(SetupMixin, _PoolBase, Generic[P], ABC):
120120
"""Base class for Pool implementations in minos"""
121121

122-
def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = 300, already_setup: bool = True, **kwargs):
123-
super().__init__(*args, maxsize=maxsize, recycle=recycle, already_setup=already_setup, **kwargs)
122+
def __init__(self, *args, already_setup: bool = True, **kwargs):
123+
super().__init__(*args, already_setup=already_setup, **kwargs)
124124

125125
# noinspection PyUnresolvedReferences
126126
async def __acquire(self) -> Any: # pragma: no cover

packages/core/minos-microservice-common/minos/common/testing/database/clients.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ def __init__(self, *args, **kwargs):
2121
self.kwargs = kwargs
2222
self._response = tuple()
2323

24-
async def _is_valid(self, **kwargs) -> bool:
25-
"""For testing purposes"""
26-
return True
27-
2824
async def _reset(self, **kwargs) -> None:
2925
"""For testing purposes"""
3026
self._response = tuple()

packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_abc.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@
3333
class _DatabaseClient(DatabaseClient):
3434
"""For testing purposes."""
3535

36-
async def _is_valid(self, **kwargs) -> bool:
37-
"""For testing purposes."""
38-
3936
async def _reset(self, **kwargs) -> None:
4037
"""For testing purposes."""
4138

@@ -76,7 +73,7 @@ def build_release(self, hashed_key: int) -> DatabaseOperation:
7673
class TestDatabaseClient(CommonTestCase):
7774
def test_abstract(self):
7875
self.assertTrue(issubclass(DatabaseClient, (ABC, BuildableMixin)))
79-
expected = {"_is_valid", "_execute", "_fetch_all", "_reset"}
76+
expected = {"_execute", "_fetch_all", "_reset"}
8077
# noinspection PyUnresolvedReferences
8178
self.assertEqual(expected, DatabaseClient.__abstractmethods__)
8279

packages/core/minos-microservice-networks/minos/networks/brokers/pools.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,8 @@
3030
class BrokerClientPool(Pool):
3131
"""Broker Client Pool class."""
3232

33-
def __init__(
34-
self, instance_kwargs: dict[str, Any], maxsize: int = 5, recycle: Optional[int] = 3600, *args, **kwargs
35-
):
36-
super().__init__(maxsize=maxsize, recycle=recycle, *args, **kwargs)
33+
def __init__(self, instance_kwargs: dict[str, Any], maxsize: int = 5, *args, **kwargs):
34+
super().__init__(maxsize=maxsize, *args, **kwargs)
3735
self._instance_kwargs = instance_kwargs
3836

3937
@classmethod

packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
AsyncIterator,
1111
Iterable,
1212
)
13+
from functools import (
14+
partial,
15+
)
1316
from typing import (
1417
Optional,
1518
)
@@ -92,15 +95,20 @@ def __init__(
9295

9396
async def _setup(self) -> None:
9497
await super()._setup()
95-
await self._create_connection()
98+
await self.recreate()
9699

97100
async def _destroy(self) -> None:
98101
await super()._destroy()
99-
await self._close_connection()
102+
await self.close()
100103

101-
async def _create_connection(self):
102-
self._connection = await self.with_circuit_breaker(self._connect)
104+
async def recreate(self) -> None:
105+
"""Recreate the database connection.
103106
107+
:return: This method does not return anything.
108+
"""
109+
await self.close()
110+
111+
self._connection = await self.with_circuit_breaker(self._connect)
104112
logger.debug(f"Created {self.database!r} database connection identified by {id(self._connection)}!")
105113

106114
async def _connect(self) -> Connection:
@@ -113,13 +121,23 @@ async def _connect(self) -> Connection:
113121
password=self.password,
114122
)
115123

116-
async def _close_connection(self):
117-
if await self.is_valid():
124+
async def close(self) -> None:
125+
"""Close database connection.
126+
127+
:return: This method does not return anything.
128+
"""
129+
if await self.is_connected():
118130
await self._connection.close()
119-
self._connection = None
120-
logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!")
121131

122-
async def _is_valid(self) -> bool:
132+
if self._connection is not None:
133+
logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!")
134+
self._connection = None
135+
136+
async def is_connected(self) -> bool:
137+
"""Check if the client is connected.
138+
139+
:return: ``True`` if it is connected or ``False`` otherwise.
140+
"""
123141
if self._connection is None:
124142
return False
125143

@@ -154,19 +172,18 @@ async def _execute(self, operation: AiopgDatabaseOperation) -> None:
154172
if not isinstance(operation, AiopgDatabaseOperation):
155173
raise ValueError(f"The operation must be a {AiopgDatabaseOperation!r} instance. Obtained: {operation!r}")
156174

157-
await self._create_cursor()
175+
fn = partial(self._execute_cursor, operation=operation.query, parameters=operation.parameters)
158176
try:
159-
await self._cursor.execute(operation=operation.query, parameters=operation.parameters)
177+
await self.with_circuit_breaker(fn)
160178
except IntegrityError as exc:
161179
raise IntegrityException(f"The requested operation raised a integrity error: {exc!r}")
162-
except OperationalError as exc:
163-
msg = f"There was an {exc!r} while trying to connect to the database."
164-
logger.warning(msg)
165-
raise ConnectionException(msg)
166180

167-
async def _create_cursor(self):
168-
if self._cursor is None:
169-
self._cursor = await self._connection.cursor(timeout=self._cursor_timeout)
181+
async def _execute_cursor(self, operation: str, parameters: dict):
182+
if not await self.is_connected():
183+
await self.recreate()
184+
185+
self._cursor = await self._connection.cursor(timeout=self._cursor_timeout)
186+
await self._cursor.execute(operation=operation, parameters=parameters)
170187

171188
async def _destroy_cursor(self, **kwargs):
172189
if self._cursor is not None:

packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,23 +54,27 @@ def test_from_config(self):
5454
self.assertEqual(default_database["host"], client.host)
5555
self.assertEqual(default_database["port"], client.port)
5656

57-
async def test_is_valid_true(self):
57+
async def test_is_valid(self):
5858
async with AiopgDatabaseClient.from_config(self.config) as client:
5959
self.assertTrue(await client.is_valid())
6060

61-
async def test_is_valid_false_not_setup(self):
61+
async def test_is_connected_true(self):
62+
async with AiopgDatabaseClient.from_config(self.config) as client:
63+
self.assertTrue(await client.is_connected())
64+
65+
async def test_is_connected_false_not_setup(self):
6266
client = AiopgDatabaseClient.from_config(self.config)
63-
self.assertFalse(await client.is_valid())
67+
self.assertFalse(await client.is_connected())
6468

65-
async def test_is_valid_false_operational_error(self):
69+
async def test_is_connected_false_operational_error(self):
6670
async with AiopgDatabaseClient.from_config(self.config) as client:
6771
with patch.object(Connection, "isolation_level", new_callable=PropertyMock, side_effect=OperationalError):
68-
self.assertFalse(await client.is_valid())
72+
self.assertFalse(await client.is_connected())
6973

70-
async def test_is_valid_false_closed(self):
74+
async def test_is_connected_false_closed(self):
7175
async with AiopgDatabaseClient.from_config(self.config) as client:
7276
with patch.object(Connection, "closed", new_callable=PropertyMock, return_valud=False):
73-
self.assertFalse(await client.is_valid())
77+
self.assertFalse(await client.is_connected())
7478

7579
async def test_connection(self):
7680
client = AiopgDatabaseClient.from_config(self.config)
@@ -79,7 +83,7 @@ async def test_connection(self):
7983
self.assertIsInstance(client.connection, Connection)
8084
self.assertIsNone(client.connection)
8185

82-
async def test_connection_raises(self):
86+
async def test_connection_with_circuit_breaker(self):
8387
async with AiopgDatabaseClient.from_config(self.config) as c1:
8488

8589
async def _fn():
@@ -89,6 +93,18 @@ async def _fn():
8993
async with AiopgDatabaseClient.from_config(self.config) as c2:
9094
self.assertEqual(c1.connection, c2.connection)
9195

96+
async def test_connection_recreate(self):
97+
async with AiopgDatabaseClient.from_config(self.config) as client:
98+
c1 = client.connection
99+
self.assertIsInstance(c1, Connection)
100+
101+
await client.recreate()
102+
103+
c2 = client.connection
104+
self.assertIsInstance(c2, Connection)
105+
106+
self.assertNotEqual(c1, c2)
107+
92108
async def test_cursor(self):
93109
client = AiopgDatabaseClient.from_config(self.config)
94110
self.assertIsNone(client.cursor)
@@ -116,6 +132,14 @@ async def test_execute(self):
116132
execute_mock.call_args_list,
117133
)
118134

135+
async def test_execute_disconnected(self):
136+
async with AiopgDatabaseClient.from_config(self.config) as client:
137+
await client.close()
138+
self.assertFalse(await client.is_connected())
139+
140+
await client.execute(self.operation)
141+
self.assertTrue(await client.is_connected())
142+
119143
async def test_execute_raises_unsupported(self):
120144
class _DatabaseOperation(DatabaseOperation):
121145
"""For testing purposes."""
@@ -132,9 +156,16 @@ async def test_execute_raises_integrity(self):
132156

133157
async def test_execute_raises_operational(self):
134158
async with AiopgDatabaseClient.from_config(self.config) as client:
135-
with patch.object(Cursor, "execute", side_effect=OperationalError):
136-
with self.assertRaises(ConnectionException):
137-
await client.execute(self.operation)
159+
with patch.object(Cursor, "execute", side_effect=(OperationalError, None)) as mock:
160+
await client.execute(self.operation)
161+
162+
self.assertEqual(
163+
[
164+
call(operation=self.operation.query, parameters=self.operation.parameters),
165+
call(operation=self.operation.query, parameters=self.operation.parameters),
166+
],
167+
mock.call_args_list,
168+
)
138169

139170
async def test_fetch_one(self):
140171
async with AiopgDatabaseClient.from_config(self.config) as client:

packages/plugins/minos-database-lmdb/minos/plugins/lmdb/clients.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,6 @@ def _close_environment(self) -> None:
7373
if self._environment is not None:
7474
self._environment.close()
7575

76-
async def _is_valid(self, **kwargs) -> bool:
77-
return True
78-
7976
async def _reset(self, **kwargs) -> None:
8077
self._prefetched = None
8178
self._environment.sync()

0 commit comments

Comments
 (0)