Skip to content

Commit c5ce8dd

Browse files
committed
wip
1 parent a84a181 commit c5ce8dd

File tree

4 files changed

+28
-29
lines changed

4 files changed

+28
-29
lines changed

pymongo/asynchronous/pool.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,6 +1029,16 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
10291029
self.requests -= 1
10301030
self.size_cond.notify()
10311031

1032+
def _handle_connection_error(self, error: Exception, phase: str) -> None:
1033+
# Handle system overload condition. When the base AutoReconnect is
1034+
# raised and we are not an sdam pool, add to backoff and add the
1035+
# appropriate error label.
1036+
if not self.is_sdam and type(error) == AutoReconnect:
1037+
self._backoff += 1
1038+
error._add_error_label("SystemOverloaded")
1039+
error._add_error_label("Retryable")
1040+
print(f"Setting backoff in {phase}:", self._backoff) # noqa: T201
1041+
10321042
async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection:
10331043
"""Connect to Mongo and return a new AsyncConnection.
10341044
@@ -1066,7 +1076,6 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
10661076
networking_interface = await _configured_protocol_interface(self.address, self.opts)
10671077
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
10681078
except BaseException as error:
1069-
print("Got the TLS handshake error") # noqa: T201
10701079
async with self.lock:
10711080
self.active_contexts.discard(tmp_context)
10721081
if self.enabled_for_cmap:
@@ -1085,10 +1094,10 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
10851094
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
10861095
error=ConnectionClosedReason.ERROR,
10871096
)
1097+
self._handle_connection_error(error, "handshake")
10881098
if isinstance(error, (IOError, OSError, *SSLErrors)):
10891099
details = _get_timeout_details(self.opts)
10901100
_raise_connection_failure(self.address, error, timeout_details=details)
1091-
10921101
raise
10931102

10941103
conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
@@ -1109,14 +1118,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
11091118
except BaseException as e:
11101119
async with self.lock:
11111120
self.active_contexts.discard(conn.cancel_context)
1112-
# Handle system overload condition. When the base AutoReconnect is
1113-
# raised and we are not an sdam pool, add to backoff and add the
1114-
# appropriate error label.
1115-
if not self.is_sdam and type(e) == AutoReconnect:
1116-
self._backoff += 1
1117-
e._add_error_label("SystemOverloaded")
1118-
e._add_error_label("Retryable")
1119-
print("Setting backoff:", self._backoff) # noqa: T201
1121+
self._handle_connection_error(e, "hello")
11201122
await conn.close_conn(ConnectionClosedReason.ERROR)
11211123
raise
11221124

pymongo/network_layer.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ class PyMongoBaseProtocol(Protocol):
254254
def __init__(self, timeout: Optional[float] = None):
255255
self.transport: Transport = None # type: ignore[assignment]
256256
self._timeout = timeout
257-
self._connection_made = asyncio.get_running_loop().create_future()
257+
self._closing_error = asyncio.get_running_loop().create_future()
258258
self._closed = asyncio.get_running_loop().create_future()
259259
self._connection_lost = False
260260

@@ -271,13 +271,9 @@ def close(self, exc: Optional[Exception] = None) -> None:
271271
self._resolve_pending(exc)
272272
self._connection_lost = True
273273

274-
def connection_made(self, transport: BaseTransport) -> None:
275-
super().connection_made(transport)
276-
self._connection_made.set_result(None)
277-
278274
def connection_lost(self, exc: Optional[Exception] = None) -> None:
279-
if exc is not None and not self._connection_made.done():
280-
self._connection_made.set_exception(exc)
275+
if exc is not None and not self._closing_error.done():
276+
self._closing_error.set_exception(exc)
281277
self._resolve_pending(exc)
282278
if not self._closed.done():
283279
self._closed.set_result(None)
@@ -344,7 +340,7 @@ async def read(self, request_id: Optional[int], max_message_size: int) -> tuple[
344340
message = await self._done_messages.popleft()
345341
else:
346342
if self.transport and self.transport.is_closing():
347-
raise OSError("connection is already closed")
343+
return await self._closing_error
348344
read_waiter = asyncio.get_running_loop().create_future()
349345
self._pending_messages.append(read_waiter)
350346
try:

pymongo/pool_shared.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,6 @@ async def _configured_protocol_interface(
278278
server_hostname=host,
279279
ssl=ssl_context,
280280
)
281-
await protocol._connection_made
282281
except _CertificateError:
283282
# Raise _CertificateError directly like we do after match_hostname
284283
# below.

pymongo/synchronous/pool.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,6 +1025,16 @@ def remove_stale_sockets(self, reference_generation: int) -> None:
10251025
self.requests -= 1
10261026
self.size_cond.notify()
10271027

1028+
def _handle_connection_error(self, error: Exception, phase: str) -> None:
1029+
# Handle system overload condition. When the base AutoReconnect is
1030+
# raised and we are not an sdam pool, add to backoff and add the
1031+
# appropriate error label.
1032+
if not self.is_sdam and type(error) == AutoReconnect:
1033+
self._backoff += 1
1034+
error._add_error_label("SystemOverloaded")
1035+
error._add_error_label("Retryable")
1036+
print(f"Setting backoff in {phase}:", self._backoff) # noqa: T201
1037+
10281038
def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connection:
10291039
"""Connect to Mongo and return a new Connection.
10301040
@@ -1062,7 +1072,6 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
10621072
networking_interface = _configured_socket_interface(self.address, self.opts)
10631073
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
10641074
except BaseException as error:
1065-
print("Got the TLS handshake error") # noqa: T201
10661075
with self.lock:
10671076
self.active_contexts.discard(tmp_context)
10681077
if self.enabled_for_cmap:
@@ -1081,10 +1090,10 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
10811090
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
10821091
error=ConnectionClosedReason.ERROR,
10831092
)
1093+
self._handle_connection_error(error, "handshake")
10841094
if isinstance(error, (IOError, OSError, *SSLErrors)):
10851095
details = _get_timeout_details(self.opts)
10861096
_raise_connection_failure(self.address, error, timeout_details=details)
1087-
10881097
raise
10891098

10901099
conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
@@ -1105,14 +1114,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
11051114
except BaseException as e:
11061115
with self.lock:
11071116
self.active_contexts.discard(conn.cancel_context)
1108-
# Handle system overload condition. When the base AutoReconnect is
1109-
# raised and we are not an sdam pool, add to backoff and add the
1110-
# appropriate error label.
1111-
if not self.is_sdam and type(e) == AutoReconnect:
1112-
self._backoff += 1
1113-
e._add_error_label("SystemOverloaded")
1114-
e._add_error_label("Retryable")
1115-
print("Setting backoff:", self._backoff) # noqa: T201
1117+
self._handle_connection_error(e, "hello")
11161118
conn.close_conn(ConnectionClosedReason.ERROR)
11171119
raise
11181120

0 commit comments

Comments
 (0)