Skip to content

Commit 1b2a599

Browse files
committed
PYTHON-1272 Fix deadlock when garbage collecting pinned cursors and sessions (#642)
It's not safe to return the pinned connection to the pool from within Cursor.del because the Pool's lock may be held by a python thread while the cyclic garbage collector runs. Instead we send the cursor cleanup request to the client's background thread. The thread will send killCursors on the pinned socket and then return the socket to the pool. Also fixed a similar bug when garbage collecting a pinned session. (cherry picked from commit 6bc5e08)
1 parent b631313 commit 1b2a599

File tree

8 files changed

+226
-66
lines changed

8 files changed

+226
-66
lines changed

pymongo/client_session.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,14 +288,15 @@ class _TxnState(object):
288288

289289
class _Transaction(object):
290290
"""Internal class to hold transaction information in a ClientSession."""
291-
def __init__(self, opts):
291+
def __init__(self, opts, client):
292292
self.opts = opts
293293
self.state = _TxnState.NONE
294294
self.sharded = False
295295
self.pinned_address = None
296296
self.sock_mgr = None
297297
self.recovery_token = None
298298
self.attempt = 0
299+
self.client = client
299300

300301
def active(self):
301302
return self.state in (_TxnState.STARTING, _TxnState.IN_PROGRESS)
@@ -329,6 +330,13 @@ def reset(self):
329330
self.recovery_token = None
330331
self.attempt = 0
331332

333+
def __del__(self):
334+
if self.sock_mgr:
335+
# Reuse the cursor closing machinery to return the socket to the
336+
# pool soon.
337+
self.client._close_cursor_soon(0, None, self.sock_mgr)
338+
self.sock_mgr = None
339+
332340

333341
def _reraise_with_unknown_commit(exc):
334342
"""Re-raise an exception with the UnknownTransactionCommitResult label."""
@@ -382,7 +390,7 @@ def __init__(self, client, server_session, options, authset, implicit):
382390
self._operation_time = None
383391
# Is this an implicitly created session?
384392
self._implicit = implicit
385-
self._transaction = _Transaction(None)
393+
self._transaction = _Transaction(None, client)
386394

387395
def end_session(self):
388396
"""Finish this session. If a transaction has started, abort it.

pymongo/command_cursor.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -65,29 +65,31 @@ def __init__(self, collection, cursor_info, address, retrieved=0,
6565
raise TypeError("max_await_time_ms must be an integer or None")
6666

6767
def __del__(self):
68-
if self.__id and not self.__killed:
69-
self.__die()
68+
self.__die()
7069

7170
def __die(self, synchronous=False):
7271
"""Closes this cursor.
7372
"""
7473
already_killed = self.__killed
7574
self.__killed = True
7675
if self.__id and not already_killed:
76+
cursor_id = self.__id
7777
address = _CursorAddress(
7878
self.__address, self.__collection.full_name)
79-
if synchronous:
80-
self.__collection.database.client._close_cursor_now(
81-
self.__id, address, session=self.__session,
82-
sock_mgr=self.__sock_mgr)
83-
else:
84-
# The cursor will be closed later in a different session.
85-
self.__collection.database.client._close_cursor(
86-
self.__id, address)
87-
if self.__sock_mgr:
88-
self.__sock_mgr.close()
89-
self.__sock_mgr = None
90-
self.__end_session(synchronous)
79+
else:
80+
# Skip killCursors.
81+
cursor_id = 0
82+
address = None
83+
self.__collection.database.client._cleanup_cursor(
84+
synchronous,
85+
cursor_id,
86+
address,
87+
self.__sock_mgr,
88+
self.__session,
89+
self.__explicit_session)
90+
if not self.__explicit_session:
91+
self.__session = None
92+
self.__sock_mgr = None
9193

9294
def __end_session(self, synchronous):
9395
if self.__session and not self.__explicit_session:
@@ -186,7 +188,7 @@ def __send_message(self, operation):
186188
self.__id = response.data.cursor_id
187189

188190
if self.__id == 0:
189-
self.__die(True)
191+
self.close()
190192
self.__data = deque(documents)
191193

192194
def _unpack_response(self, response, cursor_id, codec_options,

pymongo/cursor.py

Lines changed: 24 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
_RawBatchGetMore,
3838
_Query,
3939
_RawBatchQuery)
40-
from pymongo.monitoring import ConnectionClosedReason
4140
from pymongo.response import PinnedResponse
4241

4342
# These errors mean that the server has already killed the cursor so there is
@@ -109,28 +108,23 @@ class CursorType(object):
109108
"""
110109

111110

112-
# This has to be an old style class due to
113-
# http://bugs.jython.org/issue1057
114-
class _SocketManager:
111+
class _SocketManager(object):
115112
"""Used with exhaust cursors to ensure the socket is returned.
116113
"""
117114
def __init__(self, sock, more_to_come):
118115
self.sock = sock
119116
self.more_to_come = more_to_come
120-
self.__closed = False
117+
self.closed = False
121118
self.lock = threading.Lock()
122119

123-
def __del__(self):
124-
self.close()
125-
126120
def update_exhaust(self, more_to_come):
127121
self.more_to_come = more_to_come
128122

129123
def close(self):
130124
"""Return this instance's socket to the connection pool.
131125
"""
132-
if not self.__closed:
133-
self.__closed = True
126+
if not self.closed:
127+
self.closed = True
134128
self.sock.unpin()
135129
self.sock = None
136130

@@ -159,6 +153,7 @@ def __init__(self, collection, filter=None, projection=None, skip=0,
159153
"""
160154
# Initialize all attributes used in __del__ before possibly raising
161155
# an error to avoid attribute errors during garbage collection.
156+
self.__collection = collection
162157
self.__id = None
163158
self.__exhaust = False
164159
self.__sock_mgr = None
@@ -211,7 +206,6 @@ def __init__(self, collection, filter=None, projection=None, skip=0,
211206
projection = {"_id": 1}
212207
projection = helpers._fields_list_to_dict(projection, "projection")
213208

214-
self.__collection = collection
215209
self.__spec = spec
216210
self.__projection = projection
217211
self.__skip = skip
@@ -297,6 +291,7 @@ def rewind(self):
297291
be sent to the server, even if the resultant data has already been
298292
retrieved by this cursor.
299293
"""
294+
self.close()
300295
self.__data = deque()
301296
self.__id = None
302297
self.__address = None
@@ -353,29 +348,23 @@ def __die(self, synchronous=False):
353348

354349
self.__killed = True
355350
if self.__id and not already_killed:
356-
if self.__exhaust and self.__sock_mgr:
357-
# If this is an exhaust cursor and we haven't completely
358-
# exhausted the result set we *must* close the socket
359-
# to stop the server from sending more data.
360-
self.__sock_mgr.sock.close_socket(
361-
ConnectionClosedReason.ERROR)
362-
else:
363-
address = _CursorAddress(
364-
self.__address, self.__collection.full_name)
365-
if synchronous:
366-
self.__collection.database.client._close_cursor_now(
367-
self.__id, address, session=self.__session,
368-
sock_mgr=self.__sock_mgr)
369-
else:
370-
# The cursor will be closed later in a different session.
371-
self.__collection.database.client._close_cursor(
372-
self.__id, address)
373-
if self.__sock_mgr:
374-
self.__sock_mgr.close()
375-
self.__sock_mgr = None
376-
if self.__session and not self.__explicit_session:
377-
self.__session._end_session(lock=synchronous)
351+
cursor_id = self.__id
352+
address = _CursorAddress(
353+
self.__address, self.__collection.full_name)
354+
else:
355+
# Skip killCursors.
356+
cursor_id = 0
357+
address = None
358+
self.__collection.database.client._cleanup_cursor(
359+
synchronous,
360+
cursor_id,
361+
address,
362+
self.__sock_mgr,
363+
self.__session,
364+
self.__explicit_session)
365+
if not self.__explicit_session:
378366
self.__session = None
367+
self.__sock_mgr = None
379368

380369
def close(self):
381370
"""Explicitly close / kill this cursor.
@@ -1098,10 +1087,10 @@ def __send_message(self, operation):
10981087
if self.__id == 0:
10991088
# Don't wait for garbage collection to call __del__, return the
11001089
# socket and the session to the pool now.
1101-
self.__die()
1090+
self.close()
11021091

11031092
if self.__limit and self.__id and self.__limit <= self.__retrieved:
1104-
self.__die()
1093+
self.close()
11051094

11061095
def _unpack_response(self, response, cursor_id, codec_options,
11071096
user_fields=None, legacy_response=False):

pymongo/mongo_client.py

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1648,10 +1648,47 @@ def close_cursor(self, cursor_id, address=None):
16481648
if not isinstance(cursor_id, integer_types):
16491649
raise TypeError("cursor_id must be an instance of (int, long)")
16501650

1651-
self._close_cursor(cursor_id, address)
1651+
self._close_cursor_soon(cursor_id, address)
16521652

1653-
def _close_cursor(self, cursor_id, address):
1654-
"""Send a kill cursors message with the given id.
1653+
def _cleanup_cursor(self, locks_allowed, cursor_id, address, sock_mgr,
1654+
session, explicit_session):
1655+
"""Cleanup a cursor from cursor.close() or __del__.
1656+
1657+
This method handles cleanup for Cursors/CommandCursors including any
1658+
pinned connection or implicit session attached at the time the cursor
1659+
was closed or garbage collected.
1660+
1661+
:Parameters:
1662+
- `locks_allowed`: True if we are allowed to acquire locks.
1663+
- `cursor_id`: The cursor id which may be 0.
1664+
- `address`: The _CursorAddress.
1665+
- `sock_mgr`: The _SocketManager for the pinned connection or None.
1666+
- `session`: The cursor's session.
1667+
- `explicit_session`: True if the session was passed explicitly.
1668+
"""
1669+
if locks_allowed:
1670+
if cursor_id:
1671+
if sock_mgr and sock_mgr.more_to_come:
1672+
# If this is an exhaust cursor and we haven't completely
1673+
# exhausted the result set we *must* close the socket
1674+
# to stop the server from sending more data.
1675+
sock_mgr.sock.close_socket(
1676+
ConnectionClosedReason.ERROR)
1677+
else:
1678+
self._close_cursor_now(
1679+
cursor_id, address, session=session,
1680+
sock_mgr=sock_mgr)
1681+
if sock_mgr:
1682+
sock_mgr.close()
1683+
else:
1684+
# The cursor will be closed later in a different session.
1685+
if cursor_id or sock_mgr:
1686+
self._close_cursor_soon(cursor_id, address, sock_mgr)
1687+
if session and not explicit_session:
1688+
session._end_session(lock=locks_allowed)
1689+
1690+
def _close_cursor_soon(self, cursor_id, address, sock_mgr=None):
1691+
"""Request that a cursor and/or connection be cleaned up soon
16551692
16561693
What closing the cursor actually means depends on this client's
16571694
cursor manager. If there is none, the cursor is closed asynchronously
@@ -1660,7 +1697,7 @@ def _close_cursor(self, cursor_id, address):
16601697
if self.__cursor_manager is not None:
16611698
self.__cursor_manager.close(cursor_id, address)
16621699
else:
1663-
self.__kill_cursors_queue.append((address, [cursor_id]))
1700+
self.__kill_cursors_queue.append((address, cursor_id, sock_mgr))
16641701

16651702
def _close_cursor_now(self, cursor_id, address=None, session=None,
16661703
sock_mgr=None):
@@ -1687,7 +1724,7 @@ def _close_cursor_now(self, cursor_id, address=None, session=None,
16871724
[cursor_id], address, self._get_topology(), session)
16881725
except PyMongoError:
16891726
# Make another attempt to kill the cursor later.
1690-
self.__kill_cursors_queue.append((address, [cursor_id]))
1727+
self._close_cursor_soon(cursor_id, address)
16911728

16921729
def kill_cursors(self, cursor_ids, address=None):
16931730
"""DEPRECATED - Send a kill cursors message soon with the given ids.
@@ -1718,7 +1755,8 @@ def kill_cursors(self, cursor_ids, address=None):
17181755
raise TypeError("cursor_ids must be a list")
17191756

17201757
# "Atomic", needs no lock.
1721-
self.__kill_cursors_queue.append((address, cursor_ids))
1758+
for cursor_id in cursor_ids:
1759+
self.__kill_cursors_queue.append((address, cursor_id, None))
17221760

17231761
def _kill_cursors(self, cursor_ids, address, topology, session):
17241762
"""Send a kill cursors message with the given ids."""
@@ -1783,15 +1821,26 @@ def _kill_cursor_impl(self, cursor_ids, address, session, sock_info):
17831821
def _process_kill_cursors(self):
17841822
"""Process any pending kill cursors requests."""
17851823
address_to_cursor_ids = defaultdict(list)
1824+
pinned_cursors = []
17861825

17871826
# Other threads or the GC may append to the queue concurrently.
17881827
while True:
17891828
try:
1790-
address, cursor_ids = self.__kill_cursors_queue.pop()
1829+
address, cursor_id, sock_mgr = self.__kill_cursors_queue.pop()
17911830
except IndexError:
17921831
break
17931832

1794-
address_to_cursor_ids[address].extend(cursor_ids)
1833+
if sock_mgr:
1834+
pinned_cursors.append((address, cursor_id, sock_mgr))
1835+
else:
1836+
address_to_cursor_ids[address].append(cursor_id)
1837+
1838+
for address, cursor_id, sock_mgr in pinned_cursors:
1839+
try:
1840+
self._cleanup_cursor(True, cursor_id, address, sock_mgr,
1841+
None, False)
1842+
except Exception:
1843+
helpers._handle_exception()
17951844

17961845
# Don't re-open topology if it's closed and there's no pending cursors.
17971846
if address_to_cursor_ids:

test/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
185185

186186
def __del__(self):
187187
if self._enabled:
188-
print(
189-
'\nERROR: client_knobs still enabled! HEARTBEAT_FREQUENCY=%s, '
188+
msg = (
189+
'ERROR: client_knobs still enabled! HEARTBEAT_FREQUENCY=%s, '
190190
'MIN_HEARTBEAT_INTERVAL=%s, KILL_CURSOR_FREQUENCY=%s, '
191191
'EVENTS_QUEUE_FREQUENCY=%s, stack:\n%s' % (
192192
common.HEARTBEAT_FREQUENCY,
@@ -195,6 +195,7 @@ def __del__(self):
195195
common.EVENTS_QUEUE_FREQUENCY,
196196
self._stack))
197197
self.disable()
198+
raise Exception(msg)
198199

199200

200201
def _all_users(db):

test/test_gridfs.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from pymongo.read_preferences import ReadPreference
3434
from gridfs.errors import CorruptGridFile, FileExists, NoFile
3535
from test.test_replica_set_client import TestReplicaSetClientBase
36+
from gridfs.grid_file import GridOutCursor
3637
from test import (client_context,
3738
unittest,
3839
IntegrationTest)
@@ -429,6 +430,14 @@ def test_gridfs_find(self):
429430
cursor.close()
430431
self.assertRaises(TypeError, self.fs.find, {}, {"_id": True})
431432

433+
def test_delete_not_initialized(self):
434+
# Creating a cursor with invalid arguments will not run __init__
435+
# but will still call __del__.
436+
cursor = GridOutCursor.__new__(GridOutCursor) # Skip calling __init__
437+
with self.assertRaises(TypeError):
438+
cursor.__init__(self.db.fs.files, {}, {"_id": True})
439+
cursor.__del__() # no error
440+
432441
def test_gridfs_find_one(self):
433442
self.assertEqual(None, self.fs.find_one())
434443

0 commit comments

Comments
 (0)