Skip to content

Commit 9c79fdd

Browse files
authored
Fix Engine.release method to release connection in any way (#756)
* Fixes Engine.release method to release connection in any way * Add one more test. Also both exceptions could be raised * Trigger CI * Fix TcpProxy for python 3.6
1 parent 6c14ca7 commit 9c79fdd

File tree

4 files changed

+122
-9
lines changed

4 files changed

+122
-9
lines changed

aiopg/sa/engine.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from ..connection import TIMEOUT
66
from ..utils import _PoolAcquireContextManager, _PoolContextManager
77
from .connection import SAConnection
8-
from .exc import InvalidRequestError
98

109
try:
1110
from sqlalchemy.dialects.postgresql.psycopg2 import (
@@ -169,10 +168,6 @@ async def _acquire(self):
169168
return conn
170169

171170
def release(self, conn):
172-
"""Revert back connection to pool."""
173-
if conn.in_transaction:
174-
raise InvalidRequestError("Cannot release a connection with "
175-
"not finished transaction")
176171
raw = conn.connection
177172
fut = self._pool.release(raw)
178173
return fut

tests/conftest.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,3 +391,77 @@ def warning():
391391
@pytest.fixture
392392
def log():
393393
yield _AssertLogsContext
394+
395+
396+
@pytest.fixture
397+
def tcp_proxy(loop):
398+
proxy = None
399+
400+
async def go(src_port, dst_port):
401+
nonlocal proxy
402+
proxy = TcpProxy(
403+
dst_port=dst_port,
404+
src_port=src_port,
405+
)
406+
await proxy.start()
407+
return proxy
408+
yield go
409+
if proxy is not None:
410+
loop.run_until_complete(proxy.disconnect())
411+
412+
413+
class TcpProxy:
414+
"""
415+
TCP proxy. Allows simulating connection breaks in tests.
416+
"""
417+
MAX_BYTES = 1024
418+
419+
def __init__(self, *, src_port, dst_port):
420+
self.src_host = '127.0.0.1'
421+
self.src_port = src_port
422+
self.dst_host = '127.0.0.1'
423+
self.dst_port = dst_port
424+
self.connections = set()
425+
426+
async def start(self):
427+
return await asyncio.start_server(
428+
self.handle_client,
429+
host=self.src_host,
430+
port=self.src_port,
431+
)
432+
433+
async def disconnect(self):
434+
while self.connections:
435+
writer = self.connections.pop()
436+
writer.close()
437+
if hasattr(writer, "wait_closed"):
438+
await writer.wait_closed()
439+
440+
@staticmethod
441+
async def _pipe(
442+
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
443+
):
444+
try:
445+
while not reader.at_eof():
446+
bytes_read = await reader.read(TcpProxy.MAX_BYTES)
447+
writer.write(bytes_read)
448+
finally:
449+
writer.close()
450+
451+
async def handle_client(
452+
self,
453+
client_reader: asyncio.StreamReader,
454+
client_writer: asyncio.StreamWriter,
455+
):
456+
server_reader, server_writer = await asyncio.open_connection(
457+
host=self.dst_host,
458+
port=self.dst_port
459+
)
460+
461+
self.connections.add(server_writer)
462+
self.connections.add(client_writer)
463+
464+
await asyncio.wait([
465+
self._pipe(server_reader, client_writer),
466+
self._pipe(client_reader, server_writer),
467+
])

tests/test_async_await.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async def test_pool_context_manager_timeout(pg_params, loop):
5555
async with aiopg.create_pool(**pg_params, minsize=1,
5656
maxsize=1) as pool:
5757
cursor_ctx = await pool.cursor()
58-
with pytest.warns(ResourceWarning):
58+
with pytest.warns(ResourceWarning, match='Invalid transaction status'):
5959
with cursor_ctx as cursor:
6060
hung_task = cursor.execute('SELECT pg_sleep(10000);')
6161
# start task

tests/test_sa_engine.py

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22

3+
import psycopg2
34
import pytest
45
from psycopg2.extensions import parse_dsn
56
from sqlalchemy import Column, Integer, MetaData, String, Table
@@ -80,10 +81,10 @@ def test_not_context_manager(engine):
8081
async def test_release_transacted(engine):
8182
conn = await engine.acquire()
8283
tr = await conn.begin()
83-
with pytest.raises(sa.InvalidRequestError):
84-
engine.release(conn)
84+
with pytest.warns(ResourceWarning, match='Invalid transaction status'):
85+
await engine.release(conn)
8586
del tr
86-
await conn.close()
87+
assert conn.closed
8788

8889

8990
def test_timeout(engine):
@@ -147,3 +148,46 @@ async def test_terminate_with_acquired_connections(make_engine):
147148
await engine.wait_closed()
148149

149150
assert conn.closed
151+
152+
153+
async def test_release_after_connection_disconnected_before_select(
154+
tcp_proxy, unused_port, pg_params, make_engine
155+
):
156+
server_port = pg_params["port"]
157+
proxy_port = unused_port()
158+
159+
tcp_proxy = await tcp_proxy(proxy_port, server_port)
160+
engine = await make_engine(port=proxy_port)
161+
162+
with pytest.raises(
163+
(psycopg2.InterfaceError, psycopg2.OperationalError)
164+
):
165+
with pytest.warns(ResourceWarning, match='Invalid transaction status'):
166+
async with engine.acquire() as conn, conn.begin():
167+
await conn.execute('SELECT 1;')
168+
await tcp_proxy.disconnect()
169+
await conn.execute('SELECT 1;')
170+
171+
assert engine.size == 0
172+
173+
174+
async def test_release_after_connection_disconnected_before_begin(
175+
tcp_proxy, unused_port, pg_params, make_engine
176+
):
177+
server_port = pg_params["port"]
178+
proxy_port = unused_port()
179+
180+
tcp_proxy = await tcp_proxy(proxy_port, server_port)
181+
engine = await make_engine(port=proxy_port)
182+
183+
with pytest.raises(
184+
(psycopg2.InterfaceError, psycopg2.OperationalError)
185+
):
186+
with pytest.warns(ResourceWarning, match='Invalid transaction status'):
187+
async with engine.acquire() as conn:
188+
await conn.execute('SELECT 1;')
189+
await tcp_proxy.disconnect()
190+
async with conn.begin():
191+
pytest.fail("Should not be here")
192+
193+
assert engine.size == 0

0 commit comments

Comments
 (0)