Skip to content

Commit a15efa1

Browse files
themyloginasvetlov
authored andcommitted
Fix #216 (#223)
* Fix #216 * Make test_cancelled_connection_is_usable_asap more readable Pass cur and fut explicitly as arguments, so it is easier to see where they come from * Test that cancelled connection is not usable until cancellation is done
1 parent 891f007 commit a15efa1

File tree

2 files changed

+79
-3
lines changed

2 files changed

+79
-3
lines changed

aiopg/connection.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs):
113113
self._timeout = timeout
114114
self._waiter = waiter
115115
self._writing = False
116+
self._cancelling = False
117+
self._cancellation_waiter = None
116118
self._echo = echo
117119
self._notifies = asyncio.Queue(loop=loop)
118120
self._weakref = weakref.ref(self)
@@ -199,8 +201,14 @@ def _fatal_error(self, message):
199201

200202
def _create_waiter(self, func_name):
201203
if self._waiter is not None:
202-
raise RuntimeError('%s() called while another coroutine is '
203-
'already waiting for incoming data' % func_name)
204+
if self._cancelling:
205+
if not self._waiter.done():
206+
raise RuntimeError('%s() called while connection is '
207+
'being cancelled' % func_name)
208+
else:
209+
raise RuntimeError('%s() called while another coroutine is '
210+
'already waiting for incoming '
211+
'data' % func_name)
204212
self._waiter = create_future(self._loop)
205213
return self._waiter
206214

@@ -212,6 +220,8 @@ def _poll(self, waiter, timeout):
212220
@asyncio.coroutine
213221
def cancel():
214222
self._waiter = create_future(self._loop)
223+
self._cancelling = True
224+
self._cancellation_waiter = self._waiter
215225
self._conn.cancel()
216226
if not self._conn.isexecuting():
217227
return
@@ -231,7 +241,13 @@ def cancel():
231241
except psycopg2.extensions.QueryCanceledError:
232242
raise asyncio.CancelledError
233243
finally:
234-
self._waiter = None
244+
if self._cancelling:
245+
self._cancelling = False
246+
if self._waiter is self._cancellation_waiter:
247+
self._waiter = None
248+
self._cancellation_waiter = None
249+
else:
250+
self._waiter = None
235251

236252
def _isexecuting(self):
237253
return self._conn.isexecuting()

tests/test_connection.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import gc
44
import psycopg2
55
import psycopg2.extras
6+
import psycopg2.extensions
67
import pytest
78
import socket
89
import time
@@ -287,6 +288,65 @@ def inner():
287288
yield from task
288289

289290

291+
@asyncio.coroutine
292+
def test_cancelled_connection_is_usable_asap(connect, loop):
293+
@asyncio.coroutine
294+
def inner(future, cursor):
295+
future.set_result(None)
296+
yield from cursor.execute("SELECT pg_sleep(10)")
297+
298+
fut = asyncio.Future(loop=loop)
299+
conn = yield from connect()
300+
cur = yield from conn.cursor()
301+
task = ensure_future(inner(fut, cur), loop=loop)
302+
yield from fut
303+
yield from asyncio.sleep(0.1, loop=loop)
304+
305+
task.cancel()
306+
307+
for tick in range(100):
308+
yield from asyncio.sleep(0, loop=loop)
309+
status = conn._conn.get_transaction_status()
310+
if status == psycopg2.extensions.TRANSACTION_STATUS_IDLE:
311+
cur = yield from conn.cursor()
312+
yield from cur.execute("SELECT 1")
313+
ret = yield from cur.fetchone()
314+
assert (1,) == ret
315+
break
316+
else:
317+
assert False, "Cancelled connection transaction status never got idle"
318+
319+
320+
@asyncio.coroutine
321+
def test_cancelled_connection_is_not_usable_until_cancellation(connect, loop):
322+
@asyncio.coroutine
323+
def inner(future, cursor):
324+
future.set_result(None)
325+
yield from cursor.execute("SELECT pg_sleep(10)")
326+
327+
fut = asyncio.Future(loop=loop)
328+
conn = yield from connect()
329+
cur = yield from conn.cursor()
330+
task = ensure_future(inner(fut, cur), loop=loop)
331+
yield from fut
332+
yield from asyncio.sleep(0.1, loop=loop)
333+
334+
task.cancel()
335+
336+
for i in range(100):
337+
yield from asyncio.sleep(0)
338+
if conn._cancelling:
339+
break
340+
else:
341+
assert False, "Connection did not start cancelling"
342+
343+
cur = yield from conn.cursor()
344+
with pytest.raises(RuntimeError) as e:
345+
yield from cur.execute('SELECT 1')
346+
assert str(e.value) == ('cursor.execute() called while connection '
347+
'is being cancelled')
348+
349+
290350
@asyncio.coroutine
291351
def test_close2(connect, loop):
292352
conn = yield from connect()

0 commit comments

Comments
 (0)