Skip to content

Commit 945561f

Browse files
Optimize connection pool
1 parent e987df2 commit 945561f

File tree

6 files changed

+200
-50
lines changed

6 files changed

+200
-50
lines changed

httpcore/_async/connection_pool.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -238,24 +238,27 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
238238
those connections to be handled seperately.
239239
"""
240240
closing_connections = []
241+
idling_connections = {c for c in self._connections if c.is_idle()}
241242

242243
# First we handle cleaning up any connections that are closed,
243244
# have expired their keep-alive, or surplus idle connections.
244245
for connection in list(self._connections):
245246
if connection.is_closed():
246247
# log: "removing closed connection"
247248
self._connections.remove(connection)
249+
idling_connections.discard(connection)
248250
elif connection.has_expired():
249251
# log: "closing expired connection"
250252
self._connections.remove(connection)
253+
idling_connections.discard(connection)
251254
closing_connections.append(connection)
252255
elif (
253256
connection.is_idle()
254-
and len([connection.is_idle() for connection in self._connections])
255-
> self._max_keepalive_connections
257+
and len(idling_connections) > self._max_keepalive_connections
256258
):
257259
# log: "closing idle connection"
258260
self._connections.remove(connection)
261+
idling_connections.discard(connection)
259262
closing_connections.append(connection)
260263

261264
# Assign queued requests to connections.
@@ -267,9 +270,6 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
267270
for connection in self._connections
268271
if connection.can_handle_request(origin) and connection.is_available()
269272
]
270-
idle_connections = [
271-
connection for connection in self._connections if connection.is_idle()
272-
]
273273

274274
# There are three cases for how we may be able to handle the request:
275275
#
@@ -286,15 +286,18 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
286286
connection = self.create_connection(origin)
287287
self._connections.append(connection)
288288
pool_request.assign_to_connection(connection)
289-
elif idle_connections:
290-
# log: "closing idle connection"
291-
connection = idle_connections[0]
292-
self._connections.remove(connection)
293-
closing_connections.append(connection)
294-
# log: "creating new connection"
295-
connection = self.create_connection(origin)
296-
self._connections.append(connection)
297-
pool_request.assign_to_connection(connection)
289+
else:
290+
idling_connection = next(
291+
(c for c in self._connections if c.is_idle()), None
292+
)
293+
if idling_connection is not None:
294+
# log: "closing idle connection"
295+
self._connections.remove(idling_connection)
296+
closing_connections.append(idling_connection)
297+
# log: "creating new connection"
298+
new_connection = self.create_connection(origin)
299+
self._connections.append(new_connection)
300+
pool_request.assign_to_connection(new_connection)
298301

299302
return closing_connections
300303

httpcore/_async/http11.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import enum
22
import logging
3+
import random
34
import ssl
45
import time
56
from types import TracebackType
@@ -56,10 +57,12 @@ def __init__(
5657
origin: Origin,
5758
stream: AsyncNetworkStream,
5859
keepalive_expiry: Optional[float] = None,
60+
socket_poll_interval_between: Tuple[float, float] = (1, 3),
5961
) -> None:
6062
self._origin = origin
6163
self._network_stream = stream
62-
self._keepalive_expiry: Optional[float] = keepalive_expiry
64+
self._keepalive_expiry = keepalive_expiry
65+
self._socket_poll_interval_between = socket_poll_interval_between
6366
self._expire_at: Optional[float] = None
6467
self._state = HTTPConnectionState.NEW
6568
self._state_lock = AsyncLock()
@@ -68,6 +71,7 @@ def __init__(
6871
our_role=h11.CLIENT,
6972
max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
7073
)
74+
self._prev_socket_use_time = time.monotonic()
7175

7276
async def handle_async_request(self, request: Request) -> Response:
7377
if not self.can_handle_request(request.url.origin):
@@ -173,6 +177,7 @@ async def _send_event(
173177
bytes_to_send = self._h11_state.send(event)
174178
if bytes_to_send is not None:
175179
await self._network_stream.write(bytes_to_send, timeout=timeout)
180+
self._prev_socket_use_time = time.monotonic()
176181

177182
# Receiving the response...
178183

@@ -224,6 +229,7 @@ async def _receive_event(
224229
data = await self._network_stream.read(
225230
self.READ_NUM_BYTES, timeout=timeout
226231
)
232+
self._prev_socket_use_time = time.monotonic()
227233

228234
# If we feed this case through h11 we'll raise an exception like:
229235
#
@@ -281,16 +287,27 @@ def is_available(self) -> bool:
281287
def has_expired(self) -> bool:
282288
now = time.monotonic()
283289
keepalive_expired = self._expire_at is not None and now > self._expire_at
290+
if keepalive_expired:
291+
return True
284292

285293
# If the HTTP connection is idle but the socket is readable, then the
286294
# only valid state is that the socket is about to return b"", indicating
287295
# a server-initiated disconnect.
288-
server_disconnected = (
289-
self._state == HTTPConnectionState.IDLE
290-
and self._network_stream.get_extra_info("is_readable")
291-
)
296+
# Checking the readable status is relatively expensive so check it at a lower frequency.
297+
if (now - self._prev_socket_use_time) > self._socket_poll_interval():
298+
self._prev_socket_use_time = now
299+
server_disconnected = (
300+
self._state == HTTPConnectionState.IDLE
301+
and self._network_stream.get_extra_info("is_readable")
302+
)
303+
return server_disconnected
304+
else:
305+
return False
292306

293-
return keepalive_expired or server_disconnected
307+
def _socket_poll_interval(self) -> float:
308+
# Randomize to avoid polling for all the connections at once
309+
low, high = self._socket_poll_interval_between
310+
return random.uniform(low, high)
294311

295312
def is_idle(self) -> bool:
296313
return self._state == HTTPConnectionState.IDLE

httpcore/_sync/connection_pool.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -238,24 +238,27 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
238238
those connections to be handled seperately.
239239
"""
240240
closing_connections = []
241+
idling_connections = {c for c in self._connections if c.is_idle()}
241242

242243
# First we handle cleaning up any connections that are closed,
243244
# have expired their keep-alive, or surplus idle connections.
244245
for connection in list(self._connections):
245246
if connection.is_closed():
246247
# log: "removing closed connection"
247248
self._connections.remove(connection)
249+
idling_connections.discard(connection)
248250
elif connection.has_expired():
249251
# log: "closing expired connection"
250252
self._connections.remove(connection)
253+
idling_connections.discard(connection)
251254
closing_connections.append(connection)
252255
elif (
253256
connection.is_idle()
254-
and len([connection.is_idle() for connection in self._connections])
255-
> self._max_keepalive_connections
257+
and len(idling_connections) > self._max_keepalive_connections
256258
):
257259
# log: "closing idle connection"
258260
self._connections.remove(connection)
261+
idling_connections.discard(connection)
259262
closing_connections.append(connection)
260263

261264
# Assign queued requests to connections.
@@ -267,9 +270,6 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
267270
for connection in self._connections
268271
if connection.can_handle_request(origin) and connection.is_available()
269272
]
270-
idle_connections = [
271-
connection for connection in self._connections if connection.is_idle()
272-
]
273273

274274
# There are three cases for how we may be able to handle the request:
275275
#
@@ -286,15 +286,18 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
286286
connection = self.create_connection(origin)
287287
self._connections.append(connection)
288288
pool_request.assign_to_connection(connection)
289-
elif idle_connections:
290-
# log: "closing idle connection"
291-
connection = idle_connections[0]
292-
self._connections.remove(connection)
293-
closing_connections.append(connection)
294-
# log: "creating new connection"
295-
connection = self.create_connection(origin)
296-
self._connections.append(connection)
297-
pool_request.assign_to_connection(connection)
289+
else:
290+
idling_connection = next(
291+
(c for c in self._connections if c.is_idle()), None
292+
)
293+
if idling_connection is not None:
294+
# log: "closing idle connection"
295+
self._connections.remove(idling_connection)
296+
closing_connections.append(idling_connection)
297+
# log: "creating new connection"
298+
new_connection = self.create_connection(origin)
299+
self._connections.append(new_connection)
300+
pool_request.assign_to_connection(new_connection)
298301

299302
return closing_connections
300303

httpcore/_sync/http11.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import enum
22
import logging
3+
import random
34
import ssl
45
import time
56
from types import TracebackType
@@ -56,10 +57,12 @@ def __init__(
5657
origin: Origin,
5758
stream: NetworkStream,
5859
keepalive_expiry: Optional[float] = None,
60+
socket_poll_interval_between: Tuple[float, float] = (1, 3),
5961
) -> None:
6062
self._origin = origin
6163
self._network_stream = stream
62-
self._keepalive_expiry: Optional[float] = keepalive_expiry
64+
self._keepalive_expiry = keepalive_expiry
65+
self._socket_poll_interval_between = socket_poll_interval_between
6366
self._expire_at: Optional[float] = None
6467
self._state = HTTPConnectionState.NEW
6568
self._state_lock = Lock()
@@ -68,6 +71,7 @@ def __init__(
6871
our_role=h11.CLIENT,
6972
max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
7073
)
74+
self._prev_socket_use_time = time.monotonic()
7175

7276
def handle_request(self, request: Request) -> Response:
7377
if not self.can_handle_request(request.url.origin):
@@ -173,6 +177,7 @@ def _send_event(
173177
bytes_to_send = self._h11_state.send(event)
174178
if bytes_to_send is not None:
175179
self._network_stream.write(bytes_to_send, timeout=timeout)
180+
self._prev_socket_use_time = time.monotonic()
176181

177182
# Receiving the response...
178183

@@ -224,6 +229,7 @@ def _receive_event(
224229
data = self._network_stream.read(
225230
self.READ_NUM_BYTES, timeout=timeout
226231
)
232+
self._prev_socket_use_time = time.monotonic()
227233

228234
# If we feed this case through h11 we'll raise an exception like:
229235
#
@@ -281,16 +287,27 @@ def is_available(self) -> bool:
281287
def has_expired(self) -> bool:
282288
now = time.monotonic()
283289
keepalive_expired = self._expire_at is not None and now > self._expire_at
290+
if keepalive_expired:
291+
return True
284292

285293
# If the HTTP connection is idle but the socket is readable, then the
286294
# only valid state is that the socket is about to return b"", indicating
287295
# a server-initiated disconnect.
288-
server_disconnected = (
289-
self._state == HTTPConnectionState.IDLE
290-
and self._network_stream.get_extra_info("is_readable")
291-
)
296+
# Checking the readable status is relatively expensive so check it at a lower frequency.
297+
if (now - self._prev_socket_use_time) > self._socket_poll_interval():
298+
self._prev_socket_use_time = now
299+
server_disconnected = (
300+
self._state == HTTPConnectionState.IDLE
301+
and self._network_stream.get_extra_info("is_readable")
302+
)
303+
return server_disconnected
304+
else:
305+
return False
292306

293-
return keepalive_expired or server_disconnected
307+
def _socket_poll_interval(self) -> float:
308+
# Randomize to avoid polling for all the connections at once
309+
low, high = self._socket_poll_interval_between
310+
return random.uniform(low, high)
294311

295312
def is_idle(self) -> bool:
296313
return self._state == HTTPConnectionState.IDLE

0 commit comments

Comments
 (0)