Skip to content

Commit cc4bc79

Browse files
author
Sergio García Prado
committed
ISSUE #367
* Add tests for `AiopgDatabaseClient`.
1 parent f5540ad commit cc4bc79

File tree

3 files changed

+210
-22
lines changed

3 files changed

+210
-22
lines changed

packages/core/minos-microservice-common/minos/common/database/clients/aiopg.py

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141

4242

4343
class AiopgDatabaseClient(DatabaseClient):
44-
"""TODO"""
44+
"""Aiopg Database Client class."""
4545

4646
_connection: Optional[Connection]
4747
_cursor: Optional[Cursor]
@@ -106,8 +106,11 @@ async def _close_connection(self):
106106
self._connection = None
107107
logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!")
108108

109-
async def is_valid(self):
110-
"""TODO"""
109+
async def is_valid(self) -> bool:
110+
"""Check if the instance is valid.
111+
112+
:return: ``True`` if it is valid or ``False`` otherwise.
113+
"""
111114
if self._connection is None:
112115
return False
113116

@@ -120,43 +123,32 @@ async def is_valid(self):
120123
return not self._connection.closed
121124

122125
async def reset(self, **kwargs) -> None:
123-
"""TODO"""
124-
await self._destroy_cursor(**kwargs)
126+
"""Reset the current instance status.
125127
126-
@property
127-
def notifications(self) -> ClosableQueue:
128-
return self._connection.notifies
128+
:param kwargs: Additional named parameters.
129+
:return: This method does not return anything.
130+
"""
131+
await self._destroy_cursor(**kwargs)
129132

130133
# noinspection PyUnusedLocal
131134
async def fetch_all(
132135
self,
133136
*args,
134137
timeout: Optional[float] = None,
135138
lock: Optional[int] = None,
136-
streaming_mode: bool = False,
137139
**kwargs,
138140
) -> AsyncIterator[tuple]:
139141
"""Submit a SQL query and return an asynchronous iterator.
140142
141143
:param timeout: An optional timeout.
142144
:param lock: Optional key to perform the query with locking. If not set, the query is performed without any
143145
lock.
144-
:param streaming_mode: If ``True`` the data fetching is performed in streaming mode, that is iterating over the
145-
cursor and yielding once a time (requires an opening connection to do that). Otherwise, all the data is
146-
fetched and keep in memory before yielding it.
147146
:param kwargs: Additional named arguments.
148147
:return: This method does not return anything.
149148
"""
150149
await self._create_cursor()
151150

152-
if streaming_mode:
153-
async for row in self._cursor:
154-
yield row
155-
return
156-
157-
rows = await self._cursor.fetchall()
158-
159-
for row in rows:
151+
async for row in self._cursor:
160152
yield row
161153

162154
# noinspection PyUnusedLocal
@@ -183,7 +175,11 @@ async def _create_cursor(self, *args, lock: Optional[Hashable] = None, **kwargs)
183175
if self._cursor is None:
184176
self._cursor = await self._connection.cursor(*args, **kwargs)
185177

186-
if lock is not None and self._lock is None:
178+
if lock is not None:
179+
if self._lock is not None:
180+
if self._lock.key != lock:
181+
raise ValueError(f"Only one lock per instance is supported. Currently locked with {self._lock!r}")
182+
return
187183
from ..locks import (
188184
DatabaseLock,
189185
)
@@ -197,10 +193,18 @@ async def _destroy_cursor(self, **kwargs):
197193
self._lock = None
198194

199195
if self._cursor is not None:
200-
if self._cursor.closed:
196+
if not self._cursor.closed:
201197
self._cursor.close()
202198
self._cursor = None
203199

200+
@property
201+
def lock(self) -> Optional[DatabaseLock]:
202+
"""Get the lock.
203+
204+
:return: A ``DatabaseLock`` instance.
205+
"""
206+
return self._lock
207+
204208
@property
205209
def cursor(self) -> Optional[Cursor]:
206210
"""Get the cursor.
@@ -209,6 +213,14 @@ def cursor(self) -> Optional[Cursor]:
209213
"""
210214
return self._cursor
211215

216+
@property
217+
def notifications(self) -> ClosableQueue:
218+
"""Get the notifications queue.
219+
220+
:return: A ``ClosableQueue`` instance.
221+
"""
222+
return self._connection.notifies
223+
212224
@property
213225
def connection(self) -> Optional[Connection]:
214226
"""Get the connection.

packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/__init__.py

Whitespace-only changes.
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
import unittest
2+
from unittest.mock import (
3+
PropertyMock,
4+
call,
5+
patch,
6+
)
7+
8+
import aiopg
9+
from aiopg import (
10+
Connection,
11+
Cursor,
12+
)
13+
from psycopg2 import (
14+
IntegrityError,
15+
OperationalError,
16+
)
17+
18+
from minos.common import (
19+
AiopgDatabaseClient,
20+
DatabaseLock,
21+
IntegrityException,
22+
UnableToConnectException,
23+
)
24+
from minos.common.testing import (
25+
PostgresAsyncTestCase,
26+
)
27+
from tests.utils import (
28+
CommonTestCase,
29+
)
30+
31+
32+
# noinspection SqlNoDataSourceInspection
33+
class TestAiopgDatabaseClient(CommonTestCase, PostgresAsyncTestCase):
34+
def setUp(self):
35+
super().setUp()
36+
self.sql = "SELECT * FROM information_schema.tables"
37+
38+
def test_constructor(self):
39+
client = AiopgDatabaseClient("foo")
40+
self.assertEqual("foo", client.database)
41+
self.assertEqual("postgres", client.user)
42+
self.assertEqual("", client.password)
43+
self.assertEqual("localhost", client.host)
44+
self.assertEqual(5432, client.port)
45+
46+
def test_from_config(self):
47+
default_database = self.config.get_default_database()
48+
client = AiopgDatabaseClient.from_config(self.config)
49+
self.assertEqual(default_database["database"], client.database)
50+
self.assertEqual(default_database["user"], client.user)
51+
self.assertEqual(default_database["password"], client.password)
52+
self.assertEqual(default_database["host"], client.host)
53+
self.assertEqual(default_database["port"], client.port)
54+
55+
async def test_is_valid_true(self):
56+
async with AiopgDatabaseClient.from_config(self.config) as client:
57+
self.assertTrue(client.is_valid())
58+
59+
async def test_is_valid_false_not_setup(self):
60+
client = AiopgDatabaseClient.from_config(self.config)
61+
self.assertFalse(await client.is_valid())
62+
63+
async def test_is_valid_false_operational_error(self):
64+
async with AiopgDatabaseClient.from_config(self.config) as client:
65+
with patch.object(Connection, "isolation_level", new_callable=PropertyMock, side_effect=OperationalError):
66+
self.assertFalse(await client.is_valid())
67+
68+
async def test_is_valid_false_closed(self):
69+
async with AiopgDatabaseClient.from_config(self.config) as client:
70+
with patch.object(Connection, "closed", new_callable=PropertyMock, return_valud=False):
71+
self.assertFalse(await client.is_valid())
72+
73+
async def test_connection(self):
74+
client = AiopgDatabaseClient.from_config(self.config)
75+
self.assertIsNone(client.connection)
76+
async with client:
77+
self.assertIsInstance(client.connection, Connection)
78+
self.assertIsNone(client.connection)
79+
80+
async def test_connection_raises(self):
81+
with patch.object(aiopg, "connect", new_callable=PropertyMock, side_effect=OperationalError):
82+
with self.assertRaises(UnableToConnectException):
83+
async with AiopgDatabaseClient.from_config(self.config):
84+
pass
85+
86+
async def test_notifications(self):
87+
async with AiopgDatabaseClient.from_config(self.config) as client:
88+
self.assertEqual(client.connection.notifies, client.notifications)
89+
90+
async def test_cursor(self):
91+
client = AiopgDatabaseClient.from_config(self.config)
92+
self.assertIsNone(client.cursor)
93+
async with client:
94+
self.assertIsNone(client.cursor)
95+
await client.execute("SELECT * FROM information_schema.tables")
96+
self.assertIsInstance(client.cursor, Cursor)
97+
98+
self.assertIsNone(client.cursor)
99+
100+
async def test_cursor_reset(self):
101+
client = AiopgDatabaseClient.from_config(self.config)
102+
async with client:
103+
await client.execute("SELECT * FROM information_schema.tables")
104+
self.assertIsInstance(client.cursor, Cursor)
105+
await client.reset()
106+
self.assertIsNone(client.cursor)
107+
108+
async def test_lock(self):
109+
client = AiopgDatabaseClient.from_config(self.config)
110+
self.assertIsNone(client.lock)
111+
async with client:
112+
self.assertIsNone(client.lock)
113+
await client.execute(self.sql, lock="foo")
114+
self.assertIsInstance(client.lock, DatabaseLock)
115+
116+
self.assertIsNone(client.lock)
117+
118+
async def test_lock_reset(self):
119+
async with AiopgDatabaseClient.from_config(self.config) as client:
120+
await client.execute(self.sql, lock="foo")
121+
self.assertIsInstance(client.lock, DatabaseLock)
122+
await client.reset()
123+
self.assertIsNone(client.lock)
124+
125+
async def test_execute(self):
126+
async with AiopgDatabaseClient.from_config(self.config) as client:
127+
with patch.object(Cursor, "execute") as execute_mock:
128+
await client.execute(self.sql)
129+
self.assertEqual(
130+
[call(operation=self.sql, parameters=None, timeout=None)],
131+
execute_mock.call_args_list,
132+
)
133+
134+
async def test_execute_with_lock(self):
135+
with patch.object(DatabaseLock, "__aenter__") as enter_lock_mock:
136+
with patch.object(DatabaseLock, "__aexit__") as exit_lock_mock:
137+
async with AiopgDatabaseClient.from_config(self.config) as client:
138+
await client.execute(self.sql, lock="foo")
139+
self.assertEqual(1, enter_lock_mock.call_count)
140+
self.assertEqual(0, exit_lock_mock.call_count)
141+
enter_lock_mock.reset_mock()
142+
exit_lock_mock.reset_mock()
143+
self.assertEqual(0, enter_lock_mock.call_count)
144+
self.assertEqual(1, exit_lock_mock.call_count)
145+
146+
async def test_execute_with_lock_multiple(self):
147+
async with AiopgDatabaseClient.from_config(self.config) as client:
148+
await client.execute(self.sql, lock="foo")
149+
await client.execute(self.sql, lock="foo")
150+
with self.assertRaises(ValueError):
151+
await client.execute(self.sql, lock="bar")
152+
153+
async def test_execute_raises_integrity(self):
154+
async with AiopgDatabaseClient.from_config(self.config) as client:
155+
with patch.object(Cursor, "execute", side_effect=IntegrityError):
156+
with self.assertRaises(IntegrityException):
157+
await client.execute(self.sql)
158+
159+
async def test_fetch_one(self):
160+
async with AiopgDatabaseClient.from_config(self.config) as client:
161+
await client.execute(self.sql)
162+
observed = await client.fetch_one()
163+
self.assertIsInstance(observed, tuple)
164+
165+
async def test_fetch_all(self):
166+
async with AiopgDatabaseClient.from_config(self.config) as client:
167+
await client.execute(self.sql)
168+
observed = [value async for value in client.fetch_all()]
169+
170+
self.assertGreater(len(observed), 0)
171+
for obs in observed:
172+
self.assertIsInstance(obs, tuple)
173+
174+
175+
if __name__ == "__main__":
176+
unittest.main()

0 commit comments

Comments
 (0)