Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions httpcore/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ def __eq__(self, other: typing.Any) -> bool:
and self.host == other.host
and self.port == other.port
)

def __hash__(self) -> int:
return hash((self.scheme, self.host, self.port))

def __str__(self) -> str:
scheme = self.scheme.decode("ascii")
Expand Down
141 changes: 97 additions & 44 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def __init__(self, request: Request) -> None:
self.request = request
self.connection: ConnectionInterface | None = None
self._connection_acquired = Event()
self._closed = False

def assign_to_connection(self, connection: ConnectionInterface | None) -> None:
self.connection = connection
Expand All @@ -39,6 +40,12 @@ def wait_for_connection(
def is_queued(self) -> bool:
return self.connection is None

def close(self) -> None:
self._closed = True

def is_closed(self) -> bool:
return self._closed


class ConnectionPool(RequestInterface):
"""
Expand Down Expand Up @@ -246,10 +253,11 @@ def handle_request(self, request: Request) -> Response:
break # pragma: nocover

except BaseException as exc:
# For any exception or cancellation we close the request and remove it from
# the queue, and then re-assign requests to connections.
pool_request.close()
with self._optional_thread_lock:
# For any exception or cancellation we remove the request from
# the queue, and then re-assign requests to connections.
self._requests.remove(pool_request)
# The request is removed from the queue in _assign_requests_to_connections().
closing = self._assign_requests_to_connections()

self._close_connections(closing)
Expand Down Expand Up @@ -279,63 +287,97 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]:
"""
closing_connections = []

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
for connection in list(self._connections):
if connection.is_closed():
# log: "removing closed connection"
self._connections.remove(connection)
elif connection.has_expired():
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and len([connection.is_idle() for connection in self._connections])
> self._max_keepalive_connections
):
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
# Local reference with copy-on-write behavior for efficiency.
new_connections = self._connections
if self._max_keepalive_connections < self._max_connections:
# Proactively purge surplus idle connections without creating a copy
# unless we actually need to drop something.
idle_kept = 0
for i, connection in enumerate(self._connections):
if connection.is_idle():
if idle_kept < self._max_keepalive_connections:
idle_kept += 1
if new_connections is not self._connections:
new_connections.append(connection)
else:
# Surplus idle connection -> mark for closing, exclude.
if new_connections is self._connections:
new_connections = self._connections[:i]
closing_connections.append(connection)
else:
if new_connections is not self._connections:
new_connections.append(connection)

if new_connections is not self._connections:
self._connections = new_connections

# Assign queued requests to connections. Remove closed requests inline.
i_req = 0
while i_req < len(self._requests):
pool_request = self._requests[i_req]
if pool_request.is_closed():
self._requests.pop(i_req)
continue
if not pool_request.is_queued():
i_req += 1
continue
origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]

first_available_connection: ConnectionInterface | None = None
first_idle_connection: ConnectionInterface | None = None

# Prepare available/idle connections while purging closed/expired ones.
# Iterate in-place using index to safely remove while scanning.
i = 0
while i < len(self._connections):
connection = self._connections[i]

if connection.is_closed():
# log: "removing closed connection"
self._connections.pop(i)
continue
if connection.has_expired():
# log: "closing expired connection"
closing_connections.append(connection)
self._connections.pop(i)
continue

if first_available_connection is None and connection.can_handle_request(origin) and connection.is_available():
first_available_connection = connection

if first_idle_connection is None and connection.is_idle():
first_idle_connection = connection

i += 1

# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# to handle the request.
if available_connections:
if first_available_connection is not None:
# log: "reusing existing connection"
connection = available_connections[0]
connection = first_available_connection
pool_request.assign_to_connection(connection)
elif len(self._connections) < self._max_connections:
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
elif first_idle_connection is not None:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
connection = first_idle_connection
if connection in self._connections:
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)

i_req += 1

return closing_connections

def _close_connections(self, closing: list[ConnectionInterface]) -> None:
Expand Down Expand Up @@ -412,9 +454,20 @@ def close(self) -> None:
with ShieldCancellation():
if hasattr(self._stream, "close"):
self._stream.close()

with self._pool._optional_thread_lock:
self._pool._requests.remove(self._pool_request)
closing = self._pool._assign_requests_to_connections()

self._pool._close_connections(closing)

# The request is removed from the queue in _assign_requests_to_connections().
self._pool_request.close()

need_assign = self._pool._max_keepalive_connections < self._pool._max_connections
if not need_assign:
for r in self._pool._requests:
if r.is_queued():
need_assign = True
break

if need_assign:
with self._pool._optional_thread_lock:
closing = self._pool._assign_requests_to_connections()

if closing:
self._pool._close_connections(closing)
Loading