Skip to content

Commit 79437cd

Browse files
authored
[PR #9672/afb5ebb backport][3.11] Reuse the oldest keep-alive connection first (#9680)
1 parent 130ca4d commit 79437cd

File tree

4 files changed

+71
-51
lines changed

4 files changed

+71
-51
lines changed

CHANGES/9672.bugfix.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed the keep-alive connection pool to be FIFO instead of LIFO -- by :user:`bdraco`.
2+
3+
Keep-alive connections are more likely to be reused before they disconnect.

aiohttp/connector.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import sys
66
import traceback
77
import warnings
8-
from collections import OrderedDict, defaultdict
8+
from collections import OrderedDict, defaultdict, deque
99
from contextlib import suppress
1010
from http import HTTPStatus
1111
from itertools import chain, cycle, islice
@@ -17,6 +17,7 @@
1717
Awaitable,
1818
Callable,
1919
DefaultDict,
20+
Deque,
2021
Dict,
2122
Iterator,
2223
List,
@@ -255,7 +256,12 @@ def __init__(
255256
if loop.get_debug():
256257
self._source_traceback = traceback.extract_stack(sys._getframe(1))
257258

258-
self._conns: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {}
259+
# Connection pool of reusable connections.
260+
# We use a deque to store connections because it has O(1) popleft()
261+
# and O(1) append() operations to implement a FIFO queue.
262+
self._conns: DefaultDict[
263+
ConnectionKey, Deque[Tuple[ResponseHandler, float]]
264+
] = defaultdict(deque)
259265
self._limit = limit
260266
self._limit_per_host = limit_per_host
261267
self._acquired: Set[ResponseHandler] = set()
@@ -362,10 +368,10 @@ def _cleanup(self) -> None:
362368
timeout = self._keepalive_timeout
363369

364370
if self._conns:
365-
connections = {}
371+
connections = defaultdict(deque)
366372
deadline = now - timeout
367373
for key, conns in self._conns.items():
368-
alive: List[Tuple[ResponseHandler, float]] = []
374+
alive: Deque[Tuple[ResponseHandler, float]] = deque()
369375
for proto, use_time in conns:
370376
if proto.is_connected() and use_time - deadline >= 0:
371377
alive.append((proto, use_time))
@@ -590,14 +596,12 @@ async def _get(
590596
591597
The connection will be marked as acquired.
592598
"""
593-
try:
594-
conns = self._conns[key]
595-
except KeyError:
599+
if (conns := self._conns.get(key)) is None:
596600
return None
597601

598602
t1 = monotonic()
599603
while conns:
600-
proto, t0 = conns.pop()
604+
proto, t0 = conns.popleft()
601605
# We will we reuse the connection if its connected and
602606
# the keepalive timeout has not been exceeded
603607
if proto.is_connected() and t1 - t0 <= self._keepalive_timeout:
@@ -689,10 +693,7 @@ def _release(
689693
self._cleanup_closed_transports.append(transport)
690694
return
691695

692-
conns = self._conns.get(key)
693-
if conns is None:
694-
conns = self._conns[key] = []
695-
conns.append((protocol, monotonic()))
696+
self._conns[key].append((protocol, monotonic()))
696697

697698
if self._cleanup_handle is None:
698699
self._cleanup_handle = helpers.weakref_handle(

tests/test_client_session.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import gc
44
import io
55
import json
6+
from collections import deque
67
from http.cookies import SimpleCookie
78
from typing import Any, Awaitable, Callable, List
89
from unittest import mock
@@ -32,7 +33,7 @@ async def make_conn():
3233

3334
conn = loop.run_until_complete(make_conn())
3435
proto = mock.Mock()
35-
conn._conns["a"] = [(proto, 123)]
36+
conn._conns["a"] = deque([(proto, 123)])
3637
yield conn
3738
loop.run_until_complete(conn.close())
3839

tests/test_connector.py

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
import ssl
88
import sys
99
import uuid
10+
from collections import defaultdict, deque
1011
from concurrent import futures
1112
from contextlib import closing, suppress
12-
from typing import Any, List, Literal, Optional, Sequence, Tuple
13+
from typing import Any, DefaultDict, Deque, List, Literal, Optional, Sequence, Tuple
1314
from unittest import mock
1415

1516
import pytest
@@ -194,7 +195,7 @@ async def test_del_with_scheduled_cleanup(loop) -> None:
194195
loop.set_debug(True)
195196
conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=0.01)
196197
transp = mock.Mock()
197-
conn._conns["a"] = [(transp, 123)]
198+
conn._conns["a"] = deque([(transp, 123)])
198199

199200
conns_impl = conn._conns
200201
exc_handler = mock.Mock()
@@ -224,7 +225,7 @@ async def make_conn():
224225

225226
conn = loop.run_until_complete(make_conn())
226227
transp = mock.Mock()
227-
conn._conns["a"] = [(transp, 123)]
228+
conn._conns["a"] = deque([(transp, 123)])
228229

229230
conns_impl = conn._conns
230231
exc_handler = mock.Mock()
@@ -283,7 +284,7 @@ async def test_close(loop) -> None:
283284

284285
conn = aiohttp.BaseConnector(loop=loop)
285286
assert not conn.closed
286-
conn._conns[("host", 8080, False)] = [(proto, object())]
287+
conn._conns[("host", 8080, False)] = deque([(proto, object())])
287288
await conn.close()
288289

289290
assert not conn._conns
@@ -296,7 +297,7 @@ async def test_get(loop: asyncio.AbstractEventLoop, key: ConnectionKey) -> None:
296297
assert await conn._get(key, []) is None
297298

298299
proto = create_mocked_conn(loop)
299-
conn._conns[key] = [(proto, loop.time())]
300+
conn._conns[key] = deque([(proto, loop.time())])
300301
connection = await conn._get(key, [])
301302
assert connection is not None
302303
assert connection.protocol == proto
@@ -310,14 +311,14 @@ async def test_get_unconnected_proto(loop) -> None:
310311
assert await conn._get(key, []) is None
311312

312313
proto = create_mocked_conn(loop)
313-
conn._conns[key] = [(proto, loop.time())]
314+
conn._conns[key] = deque([(proto, loop.time())])
314315
connection = await conn._get(key, [])
315316
assert connection is not None
316317
assert connection.protocol == proto
317318
connection.close()
318319

319320
assert await conn._get(key, []) is None
320-
conn._conns[key] = [(proto, loop.time())]
321+
conn._conns[key] = deque([(proto, loop.time())])
321322
proto.is_connected = lambda *args: False
322323
assert await conn._get(key, []) is None
323324
await conn.close()
@@ -329,14 +330,14 @@ async def test_get_unconnected_proto_ssl(loop) -> None:
329330
assert await conn._get(key, []) is None
330331

331332
proto = create_mocked_conn(loop)
332-
conn._conns[key] = [(proto, loop.time())]
333+
conn._conns[key] = deque([(proto, loop.time())])
333334
connection = await conn._get(key, [])
334335
assert connection is not None
335336
assert connection.protocol == proto
336337
connection.close()
337338

338339
assert await conn._get(key, []) is None
339-
conn._conns[key] = [(proto, loop.time())]
340+
conn._conns[key] = deque([(proto, loop.time())])
340341
proto.is_connected = lambda *args: False
341342
assert await conn._get(key, []) is None
342343
await conn.close()
@@ -348,7 +349,7 @@ async def test_get_expired(loop: asyncio.AbstractEventLoop) -> None:
348349
assert await conn._get(key, []) is None
349350

350351
proto = mock.Mock()
351-
conn._conns[key] = [(proto, loop.time() - 1000)]
352+
conn._conns[key] = deque([(proto, loop.time() - 1000)])
352353
assert await conn._get(key, []) is None
353354
assert not conn._conns
354355
await conn.close()
@@ -361,7 +362,7 @@ async def test_get_expired_ssl(loop: asyncio.AbstractEventLoop) -> None:
361362

362363
proto = mock.Mock()
363364
transport = proto.transport
364-
conn._conns[key] = [(proto, loop.time() - 1000)]
365+
conn._conns[key] = deque([(proto, loop.time() - 1000)])
365366
assert await conn._get(key, []) is None
366367
assert not conn._conns
367368
assert conn._cleanup_closed_transports == [transport]
@@ -1411,12 +1412,12 @@ async def test_release_close_do_not_delete_existing_connections(key) -> None:
14111412
proto1 = mock.Mock()
14121413

14131414
conn = aiohttp.BaseConnector()
1414-
conn._conns[key] = [(proto1, 1)]
1415+
conn._conns[key] = deque([(proto1, 1)])
14151416

14161417
proto = mock.Mock(should_close=True)
14171418
conn._acquired.add(proto)
14181419
conn._release(key, proto)
1419-
assert conn._conns[key] == [(proto1, 1)]
1420+
assert conn._conns[key] == deque([(proto1, 1)])
14201421
assert proto.close.called
14211422
await conn.close()
14221423

@@ -1451,7 +1452,7 @@ async def test_connect(loop, key) -> None:
14511452
req = ClientRequest("GET", URL("http://localhost:80"), loop=loop)
14521453

14531454
conn = aiohttp.BaseConnector(loop=loop)
1454-
conn._conns[key] = [(proto, loop.time())]
1455+
conn._conns[key] = deque([(proto, loop.time())])
14551456
conn._create_connection = mock.Mock()
14561457
conn._create_connection.return_value = loop.create_future()
14571458
conn._create_connection.return_value.set_result(proto)
@@ -1687,12 +1688,15 @@ async def test_ctor_cleanup() -> None:
16871688
assert conn._cleanup_closed_handle is not None
16881689

16891690

1690-
async def test_cleanup(key) -> None:
1691-
testset = {
1692-
key: [(mock.Mock(), 10), (mock.Mock(), 300)],
1693-
}
1694-
testset[key][0][0].is_connected.return_value = True
1695-
testset[key][1][0].is_connected.return_value = False
1691+
async def test_cleanup(key: ConnectionKey) -> None:
1692+
m1 = mock.Mock()
1693+
m2 = mock.Mock()
1694+
m1.is_connected.return_value = True
1695+
m2.is_connected.return_value = False
1696+
testset: DefaultDict[ConnectionKey, Deque[Tuple[ResponseHandler, float]]] = (
1697+
defaultdict(deque)
1698+
)
1699+
testset[key] = deque([(m1, 10), (m2, 300)])
16961700

16971701
loop = mock.Mock()
16981702
loop.time.return_value = 300
@@ -1710,7 +1714,10 @@ async def test_cleanup(key) -> None:
17101714
async def test_cleanup_close_ssl_transport(ssl_key) -> None:
17111715
proto = mock.Mock()
17121716
transport = proto.transport
1713-
testset = {ssl_key: [(proto, 10)]}
1717+
testset: DefaultDict[ConnectionKey, Deque[Tuple[ResponseHandler, float]]] = (
1718+
defaultdict(deque)
1719+
)
1720+
testset[ssl_key] = deque([(proto, 10)])
17141721

17151722
loop = mock.Mock()
17161723
new_time = asyncio.get_event_loop().time() + 300
@@ -1727,9 +1734,13 @@ async def test_cleanup_close_ssl_transport(ssl_key) -> None:
17271734
assert conn._cleanup_closed_transports == [transport]
17281735

17291736

1730-
async def test_cleanup2() -> None:
1731-
testset = {1: [(mock.Mock(), 300)]}
1732-
testset[1][0][0].is_connected.return_value = True
1737+
async def test_cleanup2(loop: asyncio.AbstractEventLoop, key: ConnectionKey) -> None:
1738+
m = create_mocked_conn()
1739+
m.is_connected.return_value = True
1740+
testset: DefaultDict[ConnectionKey, Deque[Tuple[ResponseHandler, float]]] = (
1741+
defaultdict(deque)
1742+
)
1743+
testset[key] = deque([(m, 300)])
17331744

17341745
conn = aiohttp.BaseConnector(keepalive_timeout=10)
17351746
conn._loop = mock.Mock()
@@ -1744,9 +1755,13 @@ async def test_cleanup2() -> None:
17441755
await conn.close()
17451756

17461757

1747-
async def test_cleanup3(key) -> None:
1748-
testset = {key: [(mock.Mock(), 290.1), (mock.Mock(), 305.1)]}
1749-
testset[key][0][0].is_connected.return_value = True
1758+
async def test_cleanup3(loop: asyncio.AbstractEventLoop, key: ConnectionKey) -> None:
1759+
m = create_mocked_conn(loop)
1760+
m.is_connected.return_value = True
1761+
testset: DefaultDict[ConnectionKey, Deque[Tuple[ResponseHandler, float]]] = (
1762+
defaultdict(deque)
1763+
)
1764+
testset[key] = deque([(m, 290.1), (create_mocked_conn(loop), 305.1)])
17501765

17511766
loop = mock.Mock()
17521767
loop.time.return_value = 308.5
@@ -1757,7 +1772,7 @@ async def test_cleanup3(key) -> None:
17571772
with mock.patch("aiohttp.connector.monotonic", return_value=308.5):
17581773
conn._cleanup()
17591774

1760-
assert conn._conns == {key: [testset[key][1]]}
1775+
assert conn._conns == {key: deque([testset[key][1]])}
17611776

17621777
assert conn._cleanup_handle is not None
17631778
loop.call_at.assert_called_with(319, mock.ANY, mock.ANY)
@@ -1927,7 +1942,7 @@ async def test_close_twice(loop) -> None:
19271942
proto = mock.Mock()
19281943

19291944
conn = aiohttp.BaseConnector(loop=loop)
1930-
conn._conns[1] = [(proto, object())]
1945+
conn._conns[1] = deque([(proto, object())])
19311946
await conn.close()
19321947

19331948
assert not conn._conns
@@ -2324,7 +2339,7 @@ async def test_connect_with_limit(
23242339
)
23252340

23262341
conn = aiohttp.BaseConnector(loop=loop, limit=1)
2327-
conn._conns[key] = [(proto, loop.time())]
2342+
conn._conns[key] = deque([(proto, loop.time())])
23282343
conn._create_connection = mock.Mock()
23292344
conn._create_connection.return_value = loop.create_future()
23302345
conn._create_connection.return_value.set_result(proto)
@@ -2380,7 +2395,7 @@ async def test_connect_queued_operation_tracing(loop, key) -> None:
23802395
)
23812396

23822397
conn = aiohttp.BaseConnector(loop=loop, limit=1)
2383-
conn._conns[key] = [(proto, loop.time())]
2398+
conn._conns[key] = deque([(proto, loop.time())])
23842399
conn._create_connection = mock.Mock()
23852400
conn._create_connection.return_value = loop.create_future()
23862401
conn._create_connection.return_value.set_result(proto)
@@ -2424,7 +2439,7 @@ async def test_connect_reuseconn_tracing(loop, key) -> None:
24242439
)
24252440

24262441
conn = aiohttp.BaseConnector(loop=loop, limit=1)
2427-
conn._conns[key] = [(proto, loop.time())]
2442+
conn._conns[key] = deque([(proto, loop.time())])
24282443
conn2 = await conn.connect(req, traces, ClientTimeout())
24292444
conn2.release()
24302445

@@ -2441,7 +2456,7 @@ async def test_connect_with_limit_and_limit_per_host(loop, key) -> None:
24412456
req = ClientRequest("GET", URL("http://localhost:80"), loop=loop)
24422457

24432458
conn = aiohttp.BaseConnector(loop=loop, limit=1000, limit_per_host=1)
2444-
conn._conns[key] = [(proto, loop.time())]
2459+
conn._conns[key] = deque([(proto, loop.time())])
24452460
conn._create_connection = mock.Mock()
24462461
conn._create_connection.return_value = loop.create_future()
24472462
conn._create_connection.return_value.set_result(proto)
@@ -2475,7 +2490,7 @@ async def test_connect_with_no_limit_and_limit_per_host(loop, key) -> None:
24752490
req = ClientRequest("GET", URL("http://localhost1:80"), loop=loop)
24762491

24772492
conn = aiohttp.BaseConnector(loop=loop, limit=0, limit_per_host=1)
2478-
conn._conns[key] = [(proto, loop.time())]
2493+
conn._conns[key] = deque([(proto, loop.time())])
24792494
conn._create_connection = mock.Mock()
24802495
conn._create_connection.return_value = loop.create_future()
24812496
conn._create_connection.return_value.set_result(proto)
@@ -2507,7 +2522,7 @@ async def test_connect_with_no_limits(loop, key) -> None:
25072522
req = ClientRequest("GET", URL("http://localhost:80"), loop=loop)
25082523

25092524
conn = aiohttp.BaseConnector(loop=loop, limit=0, limit_per_host=0)
2510-
conn._conns[key] = [(proto, loop.time())]
2525+
conn._conns[key] = deque([(proto, loop.time())])
25112526
conn._create_connection = mock.Mock()
25122527
conn._create_connection.return_value = loop.create_future()
25132528
conn._create_connection.return_value.set_result(proto)
@@ -2541,7 +2556,7 @@ async def test_connect_with_limit_cancelled(loop) -> None:
25412556

25422557
conn = aiohttp.BaseConnector(loop=loop, limit=1)
25432558
key = ("host", 80, False)
2544-
conn._conns[key] = [(proto, loop.time())]
2559+
conn._conns[key] = deque([(proto, loop.time())])
25452560
conn._create_connection = mock.Mock()
25462561
conn._create_connection.return_value = loop.create_future()
25472562
conn._create_connection.return_value.set_result(proto)
@@ -2687,7 +2702,7 @@ async def test_close_with_acquired_connection(loop) -> None:
26872702

26882703
conn = aiohttp.BaseConnector(loop=loop, limit=1)
26892704
key = ("host", 80, False)
2690-
conn._conns[key] = [(proto, loop.time())]
2705+
conn._conns[key] = deque([(proto, loop.time())])
26912706
conn._create_connection = mock.Mock()
26922707
conn._create_connection.return_value = loop.create_future()
26932708
conn._create_connection.return_value.set_result(proto)
@@ -3291,7 +3306,7 @@ async def allow_connection_and_add_dummy_waiter() -> None:
32913306
spec_set=True,
32923307
side_effect=[0, 1, 1, 1],
32933308
):
3294-
connector._conns[key] = [(proto, loop.time())]
3309+
connector._conns[key] = deque([(proto, loop.time())])
32953310
with mock.patch.object(
32963311
connector,
32973312
"_create_connection",

0 commit comments

Comments
 (0)