Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,9 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
async with self.lock:
conn_id = self.next_connection_id
self.next_connection_id += 1
# Use a temporary context so that interrupt_connections can cancel creating the socket.
tmp_context = _CancellationContext()
self.active_contexts.add(tmp_context)

listeners = self.opts._event_listeners
if self.enabled_for_cmap:
Expand All @@ -1267,6 +1270,8 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
try:
sock = await _configured_socket(self.address, self.opts)
except BaseException as error:
async with self.lock:
self.active_contexts.discard(tmp_context)
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_closed(
Expand All @@ -1292,6 +1297,9 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
conn = AsyncConnection(sock, self, self.address, conn_id) # type: ignore[arg-type]
async with self.lock:
self.active_contexts.add(conn.cancel_context)
self.active_contexts.discard(tmp_context)
if tmp_context.cancelled:
conn.cancel_context.cancel()
try:
if self.handshake:
await conn.hello()
Expand All @@ -1301,6 +1309,8 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A

await conn.authenticate()
except BaseException:
async with self.lock:
self.active_contexts.discard(conn.cancel_context)
conn.close_conn(ConnectionClosedReason.ERROR)
raise

Expand Down
10 changes: 10 additions & 0 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,9 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
with self.lock:
conn_id = self.next_connection_id
self.next_connection_id += 1
# Use a temporary context so that interrupt_connections can cancel creating the socket.
tmp_context = _CancellationContext()
self.active_contexts.add(tmp_context)

listeners = self.opts._event_listeners
if self.enabled_for_cmap:
Expand All @@ -1261,6 +1264,8 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
try:
sock = _configured_socket(self.address, self.opts)
except BaseException as error:
with self.lock:
self.active_contexts.discard(tmp_context)
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_closed(
Expand All @@ -1286,6 +1291,9 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
conn = Connection(sock, self, self.address, conn_id) # type: ignore[arg-type]
with self.lock:
self.active_contexts.add(conn.cancel_context)
self.active_contexts.discard(tmp_context)
if tmp_context.cancelled:
conn.cancel_context.cancel()
try:
if self.handshake:
conn.hello()
Expand All @@ -1295,6 +1303,8 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect

conn.authenticate()
except BaseException:
with self.lock:
self.active_contexts.discard(conn.cancel_context)
conn.close_conn(ConnectionClosedReason.ERROR)
raise

Expand Down
5 changes: 0 additions & 5 deletions test/test_connection_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,6 @@ def set_fail_point(self, command_args):

def run_scenario(self, scenario_def, test):
"""Run a CMAP spec test."""
if (
scenario_def["description"]
== "clear with interruptInUseConnections = true closes pending connections"
):
self.skipTest("Skip pending PYTHON-4414")
self.logs: list = []
self.assertEqual(scenario_def["version"], 1)
self.assertIn(scenario_def["style"], ["unit", "integration"])
Expand Down
Loading