Skip to content

Commit f98e0a9

Browse files
author
Sergio García Prado
authored
Merge pull request #396 from minos-framework/issue-43-implement-aiopg-circuit-breaker
#43 - Implement circuit breaker for `aiopg` connection failures
2 parents 0313fbd + 0d32411 commit f98e0a9

File tree

8 files changed

+153
-58
lines changed

8 files changed

+153
-58
lines changed

packages/core/minos-microservice-common/minos/common/database/clients/abc.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,8 @@ async def is_valid(self, **kwargs) -> bool:
6666
"""
6767
return await self._is_valid(**kwargs)
6868

69-
@abstractmethod
7069
async def _is_valid(self, **kwargs) -> bool:
71-
raise NotImplementedError
70+
return True
7271

7372
async def _destroy(self) -> None:
7473
await self.reset()

packages/core/minos-microservice-common/minos/common/pools.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,15 @@ def _get_pool_cls(self, type_: str) -> type[Pool]:
112112

113113

114114
class _PoolBase(PoolBase, ABC):
115-
def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = 300, **kwargs):
115+
def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = None, **kwargs):
116116
super().__init__(maxsize=maxsize, recycle=recycle)
117117

118118

119119
class Pool(SetupMixin, _PoolBase, Generic[P], ABC):
120120
"""Base class for Pool implementations in minos"""
121121

122-
def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = 300, already_setup: bool = True, **kwargs):
123-
super().__init__(*args, maxsize=maxsize, recycle=recycle, already_setup=already_setup, **kwargs)
122+
def __init__(self, *args, already_setup: bool = True, **kwargs):
123+
super().__init__(*args, already_setup=already_setup, **kwargs)
124124

125125
# noinspection PyUnresolvedReferences
126126
async def __acquire(self) -> Any: # pragma: no cover

packages/core/minos-microservice-common/minos/common/testing/database/clients.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ def __init__(self, *args, **kwargs):
2121
self.kwargs = kwargs
2222
self._response = tuple()
2323

24-
async def _is_valid(self, **kwargs) -> bool:
25-
"""For testing purposes"""
26-
return True
27-
2824
async def _reset(self, **kwargs) -> None:
2925
"""For testing purposes"""
3026
self._response = tuple()

packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_abc.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@
3333
class _DatabaseClient(DatabaseClient):
3434
"""For testing purposes."""
3535

36-
async def _is_valid(self, **kwargs) -> bool:
37-
"""For testing purposes."""
38-
3936
async def _reset(self, **kwargs) -> None:
4037
"""For testing purposes."""
4138

@@ -76,7 +73,7 @@ def build_release(self, hashed_key: int) -> DatabaseOperation:
7673
class TestDatabaseClient(CommonTestCase):
7774
def test_abstract(self):
7875
self.assertTrue(issubclass(DatabaseClient, (ABC, BuildableMixin)))
79-
expected = {"_is_valid", "_execute", "_fetch_all", "_reset"}
76+
expected = {"_execute", "_fetch_all", "_reset"}
8077
# noinspection PyUnresolvedReferences
8178
self.assertEqual(expected, DatabaseClient.__abstractmethods__)
8279

packages/core/minos-microservice-networks/minos/networks/brokers/pools.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,8 @@
3030
class BrokerClientPool(Pool):
3131
"""Broker Client Pool class."""
3232

33-
def __init__(
34-
self, instance_kwargs: dict[str, Any], maxsize: int = 5, recycle: Optional[int] = 3600, *args, **kwargs
35-
):
36-
super().__init__(maxsize=maxsize, recycle=recycle, *args, **kwargs)
33+
def __init__(self, instance_kwargs: dict[str, Any], maxsize: int = 5, *args, **kwargs):
34+
super().__init__(maxsize=maxsize, *args, **kwargs)
3735
self._instance_kwargs = instance_kwargs
3836

3937
@classmethod

packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py

Lines changed: 76 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,15 @@
33
)
44

55
import logging
6+
from asyncio import (
7+
TimeoutError,
8+
)
69
from collections.abc import (
710
AsyncIterator,
11+
Iterable,
12+
)
13+
from functools import (
14+
partial,
815
)
916
from typing import (
1017
Optional,
@@ -22,6 +29,7 @@
2229
)
2330

2431
from minos.common import (
32+
CircuitBreakerMixin,
2533
ConnectionException,
2634
DatabaseClient,
2735
IntegrityException,
@@ -35,7 +43,7 @@
3543
logger = logging.getLogger(__name__)
3644

3745

38-
class AiopgDatabaseClient(DatabaseClient):
46+
class AiopgDatabaseClient(DatabaseClient, CircuitBreakerMixin):
3947
"""Aiopg Database Client class."""
4048

4149
_connection: Optional[Connection]
@@ -48,10 +56,17 @@ def __init__(
4856
port: Optional[int] = None,
4957
user: Optional[str] = None,
5058
password: Optional[str] = None,
59+
circuit_breaker_exceptions: Iterable[type] = tuple(),
60+
connection_timeout: Optional[float] = None,
61+
cursor_timeout: Optional[float] = None,
5162
*args,
5263
**kwargs,
5364
):
54-
super().__init__(*args, **kwargs)
65+
super().__init__(
66+
*args,
67+
**kwargs,
68+
circuit_breaker_exceptions=(ConnectionException, *circuit_breaker_exceptions),
69+
)
5570

5671
if host is None:
5772
host = "localhost"
@@ -61,43 +76,71 @@ def __init__(
6176
user = "postgres"
6277
if password is None:
6378
password = ""
79+
if connection_timeout is None:
80+
connection_timeout = 1
81+
if cursor_timeout is None:
82+
cursor_timeout = 60
6483

6584
self._database = database
6685
self._host = host
6786
self._port = port
6887
self._user = user
6988
self._password = password
7089

90+
self._connection_timeout = connection_timeout
91+
self._cursor_timeout = cursor_timeout
92+
7193
self._connection = None
7294
self._cursor = None
7395

7496
async def _setup(self) -> None:
7597
await super()._setup()
76-
await self._create_connection()
98+
await self.recreate()
7799

78100
async def _destroy(self) -> None:
79101
await super()._destroy()
80-
await self._close_connection()
102+
await self.close()
103+
104+
async def recreate(self) -> None:
105+
"""Recreate the database connection.
106+
107+
:return: This method does not return anything.
108+
"""
109+
await self.close()
81110

82-
async def _create_connection(self):
111+
self._connection = await self.with_circuit_breaker(self._connect)
112+
logger.debug(f"Created {self.database!r} database connection identified by {id(self._connection)}!")
113+
114+
async def _connect(self) -> Connection:
83115
try:
84-
self._connection = await aiopg.connect(
85-
host=self.host, port=self.port, dbname=self.database, user=self.user, password=self.password
116+
return await aiopg.connect(
117+
timeout=self._connection_timeout,
118+
host=self.host,
119+
port=self.port,
120+
dbname=self.database,
121+
user=self.user,
122+
password=self.password,
86123
)
87-
except OperationalError as exc:
88-
msg = f"There was an {exc!r} while trying to get a database connection."
89-
logger.warning(msg)
90-
raise ConnectionException(msg)
124+
except (OperationalError, TimeoutError) as exc:
125+
raise ConnectionException(f"There was not possible to connect to the database: {exc!r}")
91126

92-
logger.debug(f"Created {self.database!r} database connection identified by {id(self._connection)}!")
127+
async def close(self) -> None:
128+
"""Close database connection.
93129
94-
async def _close_connection(self):
95-
if self._connection is not None and not self._connection.closed:
130+
:return: This method does not return anything.
131+
"""
132+
if await self.is_connected():
96133
await self._connection.close()
97-
self._connection = None
98-
logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!")
99134

100-
async def _is_valid(self) -> bool:
135+
if self._connection is not None:
136+
logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!")
137+
self._connection = None
138+
139+
async def is_connected(self) -> bool:
140+
"""Check if the client is connected.
141+
142+
:return: ``True`` if it is connected or ``False`` otherwise.
143+
"""
101144
if self._connection is None:
102145
return False
103146

@@ -114,28 +157,37 @@ async def _reset(self, **kwargs) -> None:
114157

115158
# noinspection PyUnusedLocal
116159
async def _fetch_all(self) -> AsyncIterator[tuple]:
117-
await self._create_cursor()
160+
if self._cursor is None:
161+
raise ProgrammingException("An operation must be executed before fetching any value.")
162+
118163
try:
119164
async for row in self._cursor:
120165
yield row
121166
except ProgrammingError as exc:
122167
raise ProgrammingException(str(exc))
168+
except OperationalError as exc:
169+
raise ConnectionException(f"There was not possible to connect to the database: {exc!r}")
123170

124171
# noinspection PyUnusedLocal
125172
async def _execute(self, operation: AiopgDatabaseOperation) -> None:
126173
if not isinstance(operation, AiopgDatabaseOperation):
127174
raise ValueError(f"The operation must be a {AiopgDatabaseOperation!r} instance. Obtained: {operation!r}")
128175

129-
await self._create_cursor()
176+
fn = partial(self._execute_cursor, operation=operation.query, parameters=operation.parameters)
177+
await self.with_circuit_breaker(fn)
178+
179+
async def _execute_cursor(self, operation: str, parameters: dict):
180+
if not await self.is_connected():
181+
await self.recreate()
182+
183+
self._cursor = await self._connection.cursor(timeout=self._cursor_timeout)
130184
try:
131-
await self._cursor.execute(operation=operation.query, parameters=operation.parameters)
185+
await self._cursor.execute(operation=operation, parameters=parameters)
186+
except OperationalError as exc:
187+
raise ConnectionException(f"There was not possible to connect to the database: {exc!r}")
132188
except IntegrityError as exc:
133189
raise IntegrityException(f"The requested operation raised a integrity error: {exc!r}")
134190

135-
async def _create_cursor(self):
136-
if self._cursor is None:
137-
self._cursor = await self._connection.cursor()
138-
139191
async def _destroy_cursor(self, **kwargs):
140192
if self._cursor is not None:
141193
if not self._cursor.closed:

0 commit comments

Comments
 (0)