Skip to content

Commit 1f910b5

Browse files
authored
PYTHON-4494 - AsyncMongoClient._cleanup_cursor needs to be synchronous (mongodb#1680)
1 parent bba5f81 commit 1f910b5

File tree

8 files changed

+204
-100
lines changed

8 files changed

+204
-100
lines changed

pymongo/asynchronous/client_session.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,12 @@ async def _end_session(self, lock: bool) -> None:
531531
self._client._return_server_session(self._server_session)
532532
self._server_session = None
533533

534+
def _end_implicit_session(self) -> None:
535+
# Implicit sessions can't be part of transactions or pinned connections
536+
if self._server_session is not None:
537+
self._client._return_server_session(self._server_session)
538+
self._server_session = None
539+
534540
def _check_ended(self) -> None:
535541
if self._server_session is None:
536542
raise InvalidOperation("Cannot use ended session")

pymongo/asynchronous/command_cursor.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ def __init__(
8181
self._explicit_session = explicit_session
8282
self._killed = self._id == 0
8383
self._comment = comment
84-
if _IS_SYNC and self._killed:
85-
self._end_session(True) # type: ignore[unused-coroutine]
84+
if self._killed:
85+
self._end_session()
8686

8787
if "ns" in cursor_info: # noqa: SIM401
8888
self._ns = cursor_info["ns"]
@@ -95,8 +95,7 @@ def __init__(
9595
raise TypeError("max_await_time_ms must be an integer or None")
9696

9797
def __del__(self) -> None:
98-
if _IS_SYNC:
99-
self._die(False) # type: ignore[unused-coroutine]
98+
self._die_no_lock()
10099

101100
def batch_size(self, batch_size: int) -> AsyncCommandCursor[_DocumentType]:
102101
"""Limits the number of documents returned in one batch. Each batch
@@ -198,8 +197,7 @@ def session(self) -> Optional[ClientSession]:
198197
return self._session
199198
return None
200199

201-
async def _die(self, synchronous: bool = False) -> None:
202-
"""Closes this cursor."""
200+
def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]:
203201
already_killed = self._killed
204202
self._killed = True
205203
if self._id and not already_killed:
@@ -210,8 +208,22 @@ async def _die(self, synchronous: bool = False) -> None:
210208
# Skip killCursors.
211209
cursor_id = 0
212210
address = None
213-
await self._collection.database.client._cleanup_cursor(
214-
synchronous,
211+
return cursor_id, address
212+
213+
def _die_no_lock(self) -> None:
214+
"""Closes this cursor without acquiring a lock."""
215+
cursor_id, address = self._prepare_to_die()
216+
self._collection.database.client._cleanup_cursor_no_lock(
217+
cursor_id, address, self._sock_mgr, self._session, self._explicit_session
218+
)
219+
if not self._explicit_session:
220+
self._session = None
221+
self._sock_mgr = None
222+
223+
async def _die_lock(self) -> None:
224+
"""Closes this cursor."""
225+
cursor_id, address = self._prepare_to_die()
226+
await self._collection.database.client._cleanup_cursor_lock(
215227
cursor_id,
216228
address,
217229
self._sock_mgr,
@@ -222,14 +234,14 @@ async def _die(self, synchronous: bool = False) -> None:
222234
self._session = None
223235
self._sock_mgr = None
224236

225-
async def _end_session(self, synchronous: bool) -> None:
237+
def _end_session(self) -> None:
226238
if self._session and not self._explicit_session:
227-
await self._session._end_session(lock=synchronous)
239+
self._session._end_implicit_session()
228240
self._session = None
229241

230242
async def close(self) -> None:
231243
"""Explicitly close / kill this cursor."""
232-
await self._die(True)
244+
await self._die_lock()
233245

234246
async def _send_message(self, operation: _GetMore) -> None:
235247
"""Send a getmore message and handle the response."""
@@ -243,7 +255,7 @@ async def _send_message(self, operation: _GetMore) -> None:
243255
# Don't send killCursors because the cursor is already closed.
244256
self._killed = True
245257
if exc.timeout:
246-
await self._die(False)
258+
self._die_no_lock()
247259
else:
248260
# Return the session and pinned connection, if necessary.
249261
await self.close()
@@ -305,7 +317,7 @@ async def _refresh(self) -> int:
305317
)
306318
)
307319
else: # Cursor id is zero nothing else to return
308-
await self._die(True)
320+
await self._die_lock()
309321

310322
return len(self._data)
311323

pymongo/asynchronous/cursor.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,7 @@ def retrieved(self) -> int:
259259
return self._retrieved
260260

261261
def __del__(self) -> None:
262-
if _IS_SYNC:
263-
self._die() # type: ignore[unused-coroutine]
262+
self._die_no_lock()
264263

265264
def clone(self) -> AsyncCursor[_DocumentType]:
266265
"""Get a clone of this cursor.
@@ -996,14 +995,7 @@ def _deepcopy(
996995
y[key] = value
997996
return y
998997

999-
async def _die(self, synchronous: bool = False) -> None:
1000-
"""Closes this cursor."""
1001-
try:
1002-
already_killed = self._killed
1003-
except AttributeError:
1004-
# ___init__ did not run to completion (or at all).
1005-
return
1006-
998+
def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]:
1007999
self._killed = True
10081000
if self._id and not already_killed:
10091001
cursor_id = self._id
@@ -1013,8 +1005,34 @@ async def _die(self, synchronous: bool = False) -> None:
10131005
# Skip killCursors.
10141006
cursor_id = 0
10151007
address = None
1016-
await self._collection.database.client._cleanup_cursor(
1017-
synchronous,
1008+
return cursor_id, address
1009+
1010+
def _die_no_lock(self) -> None:
1011+
"""Closes this cursor without acquiring a lock."""
1012+
try:
1013+
already_killed = self._killed
1014+
except AttributeError:
1015+
# ___init__ did not run to completion (or at all).
1016+
return
1017+
1018+
cursor_id, address = self._prepare_to_die(already_killed)
1019+
self._collection.database.client._cleanup_cursor_no_lock(
1020+
cursor_id, address, self._sock_mgr, self._session, self._explicit_session
1021+
)
1022+
if not self._explicit_session:
1023+
self._session = None
1024+
self._sock_mgr = None
1025+
1026+
async def _die_lock(self) -> None:
1027+
"""Closes this cursor."""
1028+
try:
1029+
already_killed = self._killed
1030+
except AttributeError:
1031+
# ___init__ did not run to completion (or at all).
1032+
return
1033+
1034+
cursor_id, address = self._prepare_to_die(already_killed)
1035+
await self._collection.database.client._cleanup_cursor_lock(
10181036
cursor_id,
10191037
address,
10201038
self._sock_mgr,
@@ -1027,7 +1045,7 @@ async def _die(self, synchronous: bool = False) -> None:
10271045

10281046
async def close(self) -> None:
10291047
"""Explicitly close / kill this cursor."""
1030-
await self._die(True)
1048+
await self._die_lock()
10311049

10321050
async def distinct(self, key: str) -> list:
10331051
"""Get a list of distinct values for `key` among all documents
@@ -1080,7 +1098,7 @@ async def _send_message(self, operation: Union[_Query, _GetMore]) -> None:
10801098
# Don't send killCursors because the cursor is already closed.
10811099
self._killed = True
10821100
if exc.timeout:
1083-
await self._die(False)
1101+
self._die_no_lock()
10841102
else:
10851103
await self.close()
10861104
# If this is a tailable cursor the error is likely

pymongo/asynchronous/mongo_client.py

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1857,48 +1857,63 @@ async def _retryable_write(
18571857
async with self._tmp_session(session) as s:
18581858
return await self._retry_with_session(retryable, func, s, bulk, operation, operation_id)
18591859

1860-
async def _cleanup_cursor(
1860+
def _cleanup_cursor_no_lock(
18611861
self,
1862-
locks_allowed: bool,
18631862
cursor_id: int,
18641863
address: Optional[_CursorAddress],
18651864
conn_mgr: _ConnectionManager,
18661865
session: Optional[ClientSession],
18671866
explicit_session: bool,
18681867
) -> None:
1869-
"""Cleanup a cursor from cursor.close() or __del__.
1868+
"""Cleanup a cursor from __del__ without locking.
1869+
1870+
This method handles cleanup for Cursors/CommandCursors including any
1871+
pinned connection attached at the time the cursor
1872+
was garbage collected.
1873+
1874+
:param cursor_id: The cursor id which may be 0.
1875+
:param address: The _CursorAddress.
1876+
:param conn_mgr: The _ConnectionManager for the pinned connection or None.
1877+
"""
1878+
# The cursor will be closed later in a different session.
1879+
if cursor_id or conn_mgr:
1880+
self._close_cursor_soon(cursor_id, address, conn_mgr)
1881+
if session and not explicit_session:
1882+
session._end_implicit_session()
1883+
1884+
async def _cleanup_cursor_lock(
1885+
self,
1886+
cursor_id: int,
1887+
address: Optional[_CursorAddress],
1888+
conn_mgr: _ConnectionManager,
1889+
session: Optional[ClientSession],
1890+
explicit_session: bool,
1891+
) -> None:
1892+
"""Cleanup a cursor from cursor.close() using a lock.
18701893
18711894
This method handles cleanup for Cursors/CommandCursors including any
18721895
pinned connection or implicit session attached at the time the cursor
18731896
was closed or garbage collected.
18741897
1875-
:param locks_allowed: True if we are allowed to acquire locks.
18761898
:param cursor_id: The cursor id which may be 0.
18771899
:param address: The _CursorAddress.
18781900
:param conn_mgr: The _ConnectionManager for the pinned connection or None.
18791901
:param session: The cursor's session.
18801902
:param explicit_session: True if the session was passed explicitly.
18811903
"""
1882-
if locks_allowed:
1883-
if cursor_id:
1884-
if conn_mgr and conn_mgr.more_to_come:
1885-
# If this is an exhaust cursor and we haven't completely
1886-
# exhausted the result set we *must* close the socket
1887-
# to stop the server from sending more data.
1888-
assert conn_mgr.conn is not None
1889-
conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
1890-
else:
1891-
await self._close_cursor_now(
1892-
cursor_id, address, session=session, conn_mgr=conn_mgr
1893-
)
1894-
if conn_mgr:
1895-
await conn_mgr.close()
1896-
else:
1897-
# The cursor will be closed later in a different session.
1898-
if cursor_id or conn_mgr:
1899-
self._close_cursor_soon(cursor_id, address, conn_mgr)
1904+
if cursor_id:
1905+
if conn_mgr and conn_mgr.more_to_come:
1906+
# If this is an exhaust cursor and we haven't completely
1907+
# exhausted the result set we *must* close the socket
1908+
# to stop the server from sending more data.
1909+
assert conn_mgr.conn is not None
1910+
conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
1911+
else:
1912+
await self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr)
1913+
if conn_mgr:
1914+
await conn_mgr.close()
19001915
if session and not explicit_session:
1901-
await session._end_session(lock=locks_allowed)
1916+
session._end_implicit_session()
19021917

19031918
async def _close_cursor_now(
19041919
self,
@@ -1978,7 +1993,7 @@ async def _process_kill_cursors(self) -> None:
19781993

19791994
for address, cursor_id, conn_mgr in pinned_cursors:
19801995
try:
1981-
await self._cleanup_cursor(True, cursor_id, address, conn_mgr, None, False)
1996+
await self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None, False)
19821997
except Exception as exc:
19831998
if isinstance(exc, InvalidOperation) and self._topology._closed:
19841999
# Raise the exception when client is closed so that it

pymongo/synchronous/client_session.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,12 @@ def _end_session(self, lock: bool) -> None:
531531
self._client._return_server_session(self._server_session)
532532
self._server_session = None
533533

534+
def _end_implicit_session(self) -> None:
535+
# Implicit sessions can't be part of transactions or pinned connections
536+
if self._server_session is not None:
537+
self._client._return_server_session(self._server_session)
538+
self._server_session = None
539+
534540
def _check_ended(self) -> None:
535541
if self._server_session is None:
536542
raise InvalidOperation("Cannot use ended session")

pymongo/synchronous/command_cursor.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ def __init__(
8181
self._explicit_session = explicit_session
8282
self._killed = self._id == 0
8383
self._comment = comment
84-
if _IS_SYNC and self._killed:
85-
self._end_session(True) # type: ignore[unused-coroutine]
84+
if self._killed:
85+
self._end_session()
8686

8787
if "ns" in cursor_info: # noqa: SIM401
8888
self._ns = cursor_info["ns"]
@@ -95,8 +95,7 @@ def __init__(
9595
raise TypeError("max_await_time_ms must be an integer or None")
9696

9797
def __del__(self) -> None:
98-
if _IS_SYNC:
99-
self._die(False) # type: ignore[unused-coroutine]
98+
self._die_no_lock()
10099

101100
def batch_size(self, batch_size: int) -> CommandCursor[_DocumentType]:
102101
"""Limits the number of documents returned in one batch. Each batch
@@ -198,8 +197,7 @@ def session(self) -> Optional[ClientSession]:
198197
return self._session
199198
return None
200199

201-
def _die(self, synchronous: bool = False) -> None:
202-
"""Closes this cursor."""
200+
def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]:
203201
already_killed = self._killed
204202
self._killed = True
205203
if self._id and not already_killed:
@@ -210,8 +208,22 @@ def _die(self, synchronous: bool = False) -> None:
210208
# Skip killCursors.
211209
cursor_id = 0
212210
address = None
213-
self._collection.database.client._cleanup_cursor(
214-
synchronous,
211+
return cursor_id, address
212+
213+
def _die_no_lock(self) -> None:
214+
"""Closes this cursor without acquiring a lock."""
215+
cursor_id, address = self._prepare_to_die()
216+
self._collection.database.client._cleanup_cursor_no_lock(
217+
cursor_id, address, self._sock_mgr, self._session, self._explicit_session
218+
)
219+
if not self._explicit_session:
220+
self._session = None
221+
self._sock_mgr = None
222+
223+
def _die_lock(self) -> None:
224+
"""Closes this cursor."""
225+
cursor_id, address = self._prepare_to_die()
226+
self._collection.database.client._cleanup_cursor_lock(
215227
cursor_id,
216228
address,
217229
self._sock_mgr,
@@ -222,14 +234,14 @@ def _die(self, synchronous: bool = False) -> None:
222234
self._session = None
223235
self._sock_mgr = None
224236

225-
def _end_session(self, synchronous: bool) -> None:
237+
def _end_session(self) -> None:
226238
if self._session and not self._explicit_session:
227-
self._session._end_session(lock=synchronous)
239+
self._session._end_implicit_session()
228240
self._session = None
229241

230242
def close(self) -> None:
231243
"""Explicitly close / kill this cursor."""
232-
self._die(True)
244+
self._die_lock()
233245

234246
def _send_message(self, operation: _GetMore) -> None:
235247
"""Send a getmore message and handle the response."""
@@ -243,7 +255,7 @@ def _send_message(self, operation: _GetMore) -> None:
243255
# Don't send killCursors because the cursor is already closed.
244256
self._killed = True
245257
if exc.timeout:
246-
self._die(False)
258+
self._die_no_lock()
247259
else:
248260
# Return the session and pinned connection, if necessary.
249261
self.close()
@@ -305,7 +317,7 @@ def _refresh(self) -> int:
305317
)
306318
)
307319
else: # Cursor id is zero nothing else to return
308-
self._die(True)
320+
self._die_lock()
309321

310322
return len(self._data)
311323

0 commit comments

Comments
 (0)