diff --git a/httpcore/_models.py b/httpcore/_models.py index 8a65f133..7a859112 100644 --- a/httpcore/_models.py +++ b/httpcore/_models.py @@ -171,6 +171,9 @@ def __eq__(self, other: typing.Any) -> bool: and self.host == other.host and self.port == other.port ) + + def __hash__(self) -> int: + return hash((self.scheme, self.host, self.port)) def __str__(self) -> str: scheme = self.scheme.decode("ascii") diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 9ccfa53e..de019020 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -19,6 +19,7 @@ def __init__(self, request: Request) -> None: self.request = request self.connection: ConnectionInterface | None = None self._connection_acquired = Event() + self._closed = False def assign_to_connection(self, connection: ConnectionInterface | None) -> None: self.connection = connection @@ -39,6 +40,12 @@ def wait_for_connection( def is_queued(self) -> bool: return self.connection is None + def close(self) -> None: + self._closed = True + + def is_closed(self) -> bool: + return self._closed + class ConnectionPool(RequestInterface): """ @@ -246,10 +253,11 @@ def handle_request(self, request: Request) -> Response: break # pragma: nocover except BaseException as exc: + # For any exception or cancellation we close the request and remove it from + # the queue, and then re-assign requests to connections. + pool_request.close() with self._optional_thread_lock: - # For any exception or cancellation we remove the request from - # the queue, and then re-assign requests to connections. - self._requests.remove(pool_request) + # The request is removed from the queue in _assign_requests_to_connections(). closing = self._assign_requests_to_connections() self._close_connections(closing) @@ -279,37 +287,68 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]: """ closing_connections = [] - # First we handle cleaning up any connections that are closed, - # have expired their keep-alive, or surplus idle connections. - for connection in list(self._connections): - if connection.is_closed(): - # log: "removing closed connection" - self._connections.remove(connection) - elif connection.has_expired(): - # log: "closing expired connection" - self._connections.remove(connection) - closing_connections.append(connection) - elif ( - connection.is_idle() - and len([connection.is_idle() for connection in self._connections]) - > self._max_keepalive_connections - ): - # log: "closing idle connection" - self._connections.remove(connection) - closing_connections.append(connection) - - # Assign queued requests to connections. - queued_requests = [request for request in self._requests if request.is_queued()] - for pool_request in queued_requests: + # Local reference with copy-on-write behavior for efficiency. + new_connections = self._connections + if self._max_keepalive_connections < self._max_connections: + # Proactively purge surplus idle connections without creating a copy + # unless we actually need to drop something. + idle_kept = 0 + for i, connection in enumerate(self._connections): + if connection.is_idle(): + if idle_kept < self._max_keepalive_connections: + idle_kept += 1 + if new_connections is not self._connections: + new_connections.append(connection) + else: + # Surplus idle connection -> mark for closing, exclude. + if new_connections is self._connections: + new_connections = self._connections[:i] + closing_connections.append(connection) + else: + if new_connections is not self._connections: + new_connections.append(connection) + + if new_connections is not self._connections: + self._connections = new_connections + + # Assign queued requests to connections. Remove closed requests inline. + i_req = 0 + while i_req < len(self._requests): + pool_request = self._requests[i_req] + if pool_request.is_closed(): + self._requests.pop(i_req) + continue + if not pool_request.is_queued(): + i_req += 1 + continue origin = pool_request.request.url.origin - available_connections = [ - connection - for connection in self._connections - if connection.can_handle_request(origin) and connection.is_available() - ] - idle_connections = [ - connection for connection in self._connections if connection.is_idle() - ] + + first_available_connection: ConnectionInterface | None = None + first_idle_connection: ConnectionInterface | None = None + + # Prepare available/idle connections while purging closed/expired ones. + # Iterate in-place using index to safely remove while scanning. + i = 0 + while i < len(self._connections): + connection = self._connections[i] + + if connection.is_closed(): + # log: "removing closed connection" + self._connections.pop(i) + continue + if connection.has_expired(): + # log: "closing expired connection" + closing_connections.append(connection) + self._connections.pop(i) + continue + + if first_available_connection is None and connection.can_handle_request(origin) and connection.is_available(): + first_available_connection = connection + + if first_idle_connection is None and connection.is_idle(): + first_idle_connection = connection + + i += 1 # There are three cases for how we may be able to handle the request: # @@ -317,25 +356,28 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]: # 2. We can create a new connection to handle the request. # 3. We can close an idle connection and then create a new connection # to handle the request. - if available_connections: + if first_available_connection is not None: # log: "reusing existing connection" - connection = available_connections[0] + connection = first_available_connection pool_request.assign_to_connection(connection) elif len(self._connections) < self._max_connections: # log: "creating new connection" connection = self.create_connection(origin) self._connections.append(connection) pool_request.assign_to_connection(connection) - elif idle_connections: + elif first_idle_connection is not None: # log: "closing idle connection" - connection = idle_connections[0] - self._connections.remove(connection) + connection = first_idle_connection + if connection in self._connections: + self._connections.remove(connection) closing_connections.append(connection) # log: "creating new connection" connection = self.create_connection(origin) self._connections.append(connection) pool_request.assign_to_connection(connection) + i_req += 1 + return closing_connections def _close_connections(self, closing: list[ConnectionInterface]) -> None: @@ -412,9 +454,20 @@ def close(self) -> None: with ShieldCancellation(): if hasattr(self._stream, "close"): self._stream.close() - - with self._pool._optional_thread_lock: - self._pool._requests.remove(self._pool_request) - closing = self._pool._assign_requests_to_connections() - - self._pool._close_connections(closing) + + # The request is removed from the queue in _assign_requests_to_connections(). + self._pool_request.close() + + need_assign = self._pool._max_keepalive_connections < self._pool._max_connections + if not need_assign: + for r in self._pool._requests: + if r.is_queued(): + need_assign = True + break + + if need_assign: + with self._pool._optional_thread_lock: + closing = self._pool._assign_requests_to_connections() + + if closing: + self._pool._close_connections(closing)