Skip to content

Commit 8cea23e

Browse files
thehesiodvir-mir
authored andcommitted
resolve issue 364 (#452)
* attempt to fix issue 364 * enforce one cursor per connection * PEP * add new test * bump version * revert
1 parent 52876e3 commit 8cea23e

File tree

8 files changed

+59
-14
lines changed

8 files changed

+59
-14
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ htmlcov
1010
docs/_build
1111
.tox
1212
.cache
13-
.pytest_cache
13+
.pytest_cache

aiopg/connection.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,11 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs):
117117
self._cancelling = False
118118
self._cancellation_waiter = None
119119
self._echo = echo
120+
self._conn_cursor = None
120121
self._notifies = asyncio.Queue(loop=loop)
121122
self._weakref = weakref.ref(self)
122123
self._loop.add_reader(self._fileno, self._ready, self._weakref)
124+
123125
if loop.get_debug():
124126
self._source_traceback = traceback.extract_stack(sys._getframe(1))
125127

@@ -264,13 +266,29 @@ def cursor(self, name=None, cursor_factory=None,
264266
*name*, *scrollable* and *withhold* parameters are not supported by
265267
psycopg in asynchronous mode.
266268
269+
NOTE: as of [TODO] any previously created created cursor from this
270+
connection will be closed
267271
"""
272+
self.close_cursor()
273+
268274
self._last_usage = self._loop.time()
269275
coro = self._cursor(name=name, cursor_factory=cursor_factory,
270276
scrollable=scrollable, withhold=withhold,
271277
timeout=timeout)
272278
return _ContextManager(coro)
273279

280+
def cursor_created(self, cursor):
281+
if self._conn_cursor and not self._conn_cursor.closed:
282+
raise Exception("You can only have one cursor per connection")
283+
284+
self._conn_cursor = cursor
285+
286+
def cursor_closed(self, cursor):
287+
if cursor != self._conn_cursor:
288+
raise Exception("You can only have one cursor per connection")
289+
290+
self._conn_cursor = None
291+
274292
@asyncio.coroutine
275293
def _cursor(self, name=None, cursor_factory=None,
276294
scrollable=None, withhold=False, timeout=None):
@@ -281,7 +299,8 @@ def _cursor(self, name=None, cursor_factory=None,
281299
cursor_factory=cursor_factory,
282300
scrollable=scrollable,
283301
withhold=withhold)
284-
return Cursor(self, impl, timeout, self._echo)
302+
cursor = Cursor(self, impl, timeout, self._echo)
303+
return cursor
285304

286305
@asyncio.coroutine
287306
def _cursor_impl(self, name=None, cursor_factory=None,
@@ -303,7 +322,10 @@ def _close(self):
303322
if self._writing:
304323
self._writing = False
305324
self._loop.remove_writer(self._fileno)
325+
326+
self.close_cursor()
306327
self._conn.close()
328+
307329
if self._waiter is not None and not self._waiter.done():
308330
self._waiter.set_exception(
309331
psycopg2.OperationalError("Connection closed"))
@@ -314,6 +336,10 @@ def close(self):
314336
ret.set_result(None)
315337
return ret
316338

339+
def close_cursor(self):
340+
if self._conn_cursor:
341+
self._conn_cursor.close()
342+
317343
@property
318344
def closed(self):
319345
"""Connection status.

aiopg/cursor.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ def __init__(self, conn, impl, timeout, echo):
1616
self._echo = echo
1717
self._transaction = Transaction(self, IsolationLevel.repeatable_read)
1818

19+
conn.cursor_created(self)
20+
1921
@property
2022
def echo(self):
2123
"""Return echo mode status."""
@@ -48,7 +50,9 @@ def description(self):
4850

4951
def close(self):
5052
"""Close the cursor now."""
51-
self._impl.close()
53+
if not self.closed:
54+
self._impl.close()
55+
self._conn.cursor_closed(self)
5256

5357
@property
5458
def closed(self):

aiopg/pool.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ def release(self, conn):
257257
if self._closing:
258258
conn.close()
259259
else:
260+
conn.close_cursor() # there may be weak-refs to these cursors
260261
self._free.append(conn)
261262
fut = ensure_future(self._wakeup(), loop=self._loop)
262263
return fut

setup.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66

77
install_requires = ['psycopg2>=2.7.0']
8+
extras_require = {'sa': ['sqlalchemy>=1.1']}
89

910
PY_VER = sys.version_info
1011

@@ -16,9 +17,6 @@ def read(f):
1617
return open(os.path.join(os.path.dirname(__file__), f)).read().strip()
1718

1819

19-
extras_require = {'sa': ['sqlalchemy>=1.1'], }
20-
21-
2220
def read_version():
2321
regexp = re.compile(r"^__version__\W*=\W*'([\d.abrc]+)'")
2422
init_py = os.path.join(os.path.dirname(__file__), 'aiopg', '__init__.py')

tests/pep492/test_async_await.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ async def test_cursor_create_with_context_manager(make_connection):
5454
assert cursor.closed
5555

5656

57+
@asyncio.coroutine
58+
async def test_two_cursor_create_with_context_manager(make_connection):
59+
conn = await make_connection()
60+
61+
async with conn.cursor() as cursor1, conn.cursor() as cursor2:
62+
assert cursor1.closed
63+
assert not cursor2.closed
64+
65+
5766
@asyncio.coroutine
5867
async def test_pool_context_manager_timeout(pg_params, loop):
5968
async with aiopg.create_pool(loop=loop, **pg_params, minsize=1,

tests/test_connection.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from aiopg.connection import Connection, TIMEOUT
1313
from aiopg.cursor import Cursor
14-
from aiopg.utils import ensure_future, create_future
14+
from aiopg.utils import ensure_future
1515
from unittest import mock
1616

1717

@@ -332,6 +332,7 @@ def inner(future, cursor):
332332
fut = asyncio.Future(loop=loop)
333333
conn = yield from connect()
334334
cur = yield from conn.cursor()
335+
335336
task = ensure_future(inner(fut, cur), loop=loop)
336337
yield from fut
337338
yield from asyncio.sleep(0.1, loop=loop)
@@ -345,7 +346,7 @@ def inner(future, cursor):
345346
else:
346347
assert False, "Connection did not start cancelling"
347348

348-
cur = yield from conn.cursor()
349+
# cur = yield from conn.cursor()
349350
with pytest.raises(RuntimeError) as e:
350351
yield from cur.execute('SELECT 1')
351352
assert str(e.value) == ('cursor.execute() called while connection '
@@ -480,15 +481,17 @@ def go():
480481
def test_execute_twice(connect):
481482
conn = yield from connect()
482483
cur1 = yield from conn.cursor()
483-
cur2 = yield from conn.cursor()
484+
# cur2 = yield from conn.cursor()
484485
coro1 = cur1.execute('SELECT 1')
485486
fut1 = next(coro1)
486487
assert isinstance(fut1, asyncio.Future)
487-
coro2 = cur2.execute('SELECT 2')
488+
coro2 = cur1.execute('SELECT 2')
488489

489490
with pytest.raises(RuntimeError):
490491
next(coro2)
491492

493+
yield from conn.cancel()
494+
492495

493496
@asyncio.coroutine
494497
def test_connect_to_unsupported_port(unused_port, loop, pg_params):
@@ -512,20 +515,23 @@ def test_binary_protocol_error(connect):
512515

513516
@asyncio.coroutine
514517
def test_closing_in_separate_task(connect, loop):
515-
event = create_future(loop)
518+
closed_event = asyncio.Event(loop=loop)
519+
exec_created = asyncio.Event(loop=loop)
516520

517521
@asyncio.coroutine
518522
def waiter(conn):
519523
cur = yield from conn.cursor()
520524
fut = cur.execute("SELECT pg_sleep(1000)")
521-
event.set_result(None)
522-
with pytest.raises(psycopg2.OperationalError):
525+
exec_created.set()
526+
yield from closed_event.wait()
527+
with pytest.raises(psycopg2.InterfaceError):
523528
yield from fut
524529

525530
@asyncio.coroutine
526531
def closer(conn):
527-
yield from event
532+
yield from exec_created.wait()
528533
yield from conn.close()
534+
closed_event.set()
529535

530536
conn = yield from connect()
531537
yield from asyncio.gather(waiter(conn), closer(conn),

tests/test_pool.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,7 @@ def sleep(conn):
499499
yield from sleep(conn)
500500

501501
assert 1 == pool.freesize
502+
502503
with (yield from pool) as conn:
503504
cur = yield from conn.cursor()
504505
yield from cur.execute('SELECT 1;')

0 commit comments

Comments
 (0)