Skip to content

Commit 6527dbb

Browse files
author
Sergio García Prado
committed
ISSUE #43
* Implement circuit breaker for `aiopg` connection failures.
1 parent 7e2d88b commit 6527dbb

File tree

2 files changed

+76
-19
lines changed

2 files changed

+76
-19
lines changed

packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33
)
44

55
import logging
6+
from asyncio import (
7+
TimeoutError,
8+
)
69
from collections.abc import (
710
AsyncIterator,
11+
Iterable,
812
)
913
from typing import (
1014
Optional,
@@ -22,6 +26,7 @@
2226
)
2327

2428
from minos.common import (
29+
CircuitBreakerMixin,
2530
ConnectionException,
2631
DatabaseClient,
2732
IntegrityException,
@@ -35,7 +40,7 @@
3540
logger = logging.getLogger(__name__)
3641

3742

38-
class AiopgDatabaseClient(DatabaseClient):
43+
class AiopgDatabaseClient(DatabaseClient, CircuitBreakerMixin):
3944
"""Aiopg Database Client class."""
4045

4146
_connection: Optional[Connection]
@@ -48,10 +53,17 @@ def __init__(
4853
port: Optional[int] = None,
4954
user: Optional[str] = None,
5055
password: Optional[str] = None,
56+
circuit_breaker_exceptions: Iterable[type] = tuple(),
57+
connection_timeout: Optional[float] = None,
58+
cursor_timeout: Optional[float] = None,
5159
*args,
5260
**kwargs,
5361
):
54-
super().__init__(*args, **kwargs)
62+
super().__init__(
63+
*args,
64+
**kwargs,
65+
circuit_breaker_exceptions=(OperationalError, TimeoutError, *circuit_breaker_exceptions),
66+
)
5567

5668
if host is None:
5769
host = "localhost"
@@ -61,13 +73,20 @@ def __init__(
6173
user = "postgres"
6274
if password is None:
6375
password = ""
76+
if connection_timeout is None:
77+
connection_timeout = 1
78+
if cursor_timeout is None:
79+
cursor_timeout = 60
6480

6581
self._database = database
6682
self._host = host
6783
self._port = port
6884
self._user = user
6985
self._password = password
7086

87+
self._connection_timeout = connection_timeout
88+
self._cursor_timeout = cursor_timeout
89+
7190
self._connection = None
7291
self._cursor = None
7392

@@ -80,19 +99,22 @@ async def _destroy(self) -> None:
8099
await self._close_connection()
81100

82101
async def _create_connection(self):
83-
try:
84-
self._connection = await aiopg.connect(
85-
host=self.host, port=self.port, dbname=self.database, user=self.user, password=self.password
86-
)
87-
except OperationalError as exc:
88-
msg = f"There was an {exc!r} while trying to get a database connection."
89-
logger.warning(msg)
90-
raise ConnectionException(msg)
102+
self._connection = await self.with_circuit_breaker(self._connect)
91103

92104
logger.debug(f"Created {self.database!r} database connection identified by {id(self._connection)}!")
93105

106+
async def _connect(self) -> Connection:
107+
return await aiopg.connect(
108+
timeout=self._connection_timeout,
109+
host=self.host,
110+
port=self.port,
111+
dbname=self.database,
112+
user=self.user,
113+
password=self.password,
114+
)
115+
94116
async def _close_connection(self):
95-
if self._connection is not None and not self._connection.closed:
117+
if await self.is_valid():
96118
await self._connection.close()
97119
self._connection = None
98120
logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!")
@@ -114,12 +136,18 @@ async def _reset(self, **kwargs) -> None:
114136

115137
# noinspection PyUnusedLocal
116138
async def _fetch_all(self) -> AsyncIterator[tuple]:
117-
await self._create_cursor()
139+
if self._cursor is None:
140+
raise ProgrammingException("An operation must be executed before fetching any value.")
141+
118142
try:
119143
async for row in self._cursor:
120144
yield row
121145
except ProgrammingError as exc:
122146
raise ProgrammingException(str(exc))
147+
except OperationalError as exc:
148+
msg = f"There was an {exc!r} while trying to connect to the database."
149+
logger.warning(msg)
150+
raise ConnectionException(msg)
123151

124152
# noinspection PyUnusedLocal
125153
async def _execute(self, operation: AiopgDatabaseOperation) -> None:
@@ -131,10 +159,14 @@ async def _execute(self, operation: AiopgDatabaseOperation) -> None:
131159
await self._cursor.execute(operation=operation.query, parameters=operation.parameters)
132160
except IntegrityError as exc:
133161
raise IntegrityException(f"The requested operation raised a integrity error: {exc!r}")
162+
except OperationalError as exc:
163+
msg = f"There was an {exc!r} while trying to connect to the database."
164+
logger.warning(msg)
165+
raise ConnectionException(msg)
134166

135167
async def _create_cursor(self):
136168
if self._cursor is None:
137-
self._cursor = await self._connection.cursor()
169+
self._cursor = await self._connection.cursor(timeout=self._cursor_timeout)
138170

139171
async def _destroy_cursor(self, **kwargs):
140172
if self._cursor is not None:

packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from psycopg2 import (
1414
IntegrityError,
1515
OperationalError,
16+
ProgrammingError,
1617
)
1718

1819
from minos.common import (
@@ -30,7 +31,7 @@
3031
)
3132

3233

33-
# noinspection SqlNoDataSourceInspection
34+
# noinspection SqlNoDataSourceInspection,SqlDialectInspection
3435
class TestAiopgDatabaseClient(AiopgTestCase):
3536
def setUp(self):
3637
super().setUp()
@@ -79,10 +80,14 @@ async def test_connection(self):
7980
self.assertIsNone(client.connection)
8081

8182
async def test_connection_raises(self):
82-
with patch.object(aiopg, "connect", new_callable=PropertyMock, side_effect=OperationalError):
83-
with self.assertRaises(ConnectionException):
84-
async with AiopgDatabaseClient.from_config(self.config):
85-
pass
83+
async with AiopgDatabaseClient.from_config(self.config) as c1:
84+
85+
async def _fn():
86+
return c1.connection
87+
88+
with patch.object(aiopg, "connect", new_callable=PropertyMock, side_effect=(OperationalError, _fn())):
89+
async with AiopgDatabaseClient.from_config(self.config) as c2:
90+
self.assertEqual(c1.connection, c2.connection)
8691

8792
async def test_cursor(self):
8893
client = AiopgDatabaseClient.from_config(self.config)
@@ -125,17 +130,37 @@ async def test_execute_raises_integrity(self):
125130
with self.assertRaises(IntegrityException):
126131
await client.execute(self.operation)
127132

133+
async def test_execute_raises_operational(self):
134+
async with AiopgDatabaseClient.from_config(self.config) as client:
135+
with patch.object(Cursor, "execute", side_effect=OperationalError):
136+
with self.assertRaises(ConnectionException):
137+
await client.execute(self.operation)
138+
128139
async def test_fetch_one(self):
129140
async with AiopgDatabaseClient.from_config(self.config) as client:
130141
await client.execute(self.operation)
131142
observed = await client.fetch_one()
132143
self.assertIsInstance(observed, tuple)
133144

134-
async def test_fetch_one_raises(self):
145+
async def test_fetch_one_raises_programming_empty(self):
135146
async with AiopgDatabaseClient.from_config(self.config) as client:
136147
with self.assertRaises(ProgrammingException):
137148
await client.fetch_one()
138149

150+
async def test_fetch_one_raises_programming(self):
151+
async with AiopgDatabaseClient.from_config(self.config) as client:
152+
await client.execute(self.operation)
153+
with patch.object(Cursor, "fetchone", side_effect=ProgrammingError):
154+
with self.assertRaises(ProgrammingException):
155+
await client.fetch_one()
156+
157+
async def test_fetch_one_raises_operational(self):
158+
async with AiopgDatabaseClient.from_config(self.config) as client:
159+
await client.execute(self.operation)
160+
with patch.object(Cursor, "fetchone", side_effect=OperationalError):
161+
with self.assertRaises(ConnectionException):
162+
await client.fetch_one()
163+
139164
async def test_fetch_all(self):
140165
async with AiopgDatabaseClient.from_config(self.config) as client:
141166
await client.execute(self.operation)

0 commit comments

Comments
 (0)