Skip to content

Commit 61d11fe

Browse files
committed
PYTHON-2673 Connection pinning behavior for load balanced clusters (#630)
Tweak spec test because pymongo unpins cursors eagerly after errors. Tweak spec test for PoolClearedEvent ordering when MongoDB handshake fails (see DRIVERS-1785). Only skip killCursors for some error codes. Rely on SDAM error handling to close the connection after a state change error. Add service_id to various events. Retain reference to pinned sockets to prevent premptive closure by CPython's cyclic GC. (cherry picked from commit c8f32a7)
1 parent 6d3abec commit 61d11fe

23 files changed

+471
-300
lines changed

pymongo/aggregation.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,13 @@ def get_cursor(self, session, server, sock_info, slave_ok):
161161
}
162162

163163
# Create and return cursor instance.
164-
return self._cursor_class(
164+
cmd_cursor = self._cursor_class(
165165
self._cursor_collection(cursor), cursor, sock_info.address,
166166
batch_size=self._batch_size or 0,
167167
max_await_time_ms=self._max_await_time_ms,
168168
session=session, explicit_session=self._explicit_session)
169+
cmd_cursor._maybe_pin_connection(sock_info)
170+
return cmd_cursor
169171

170172

171173
class _CollectionAggregationCommand(_AggregationCommand):

pymongo/change_stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def _run_aggregation_cmd(self, session, explicit_session):
181181

182182
return self._client._retryable_read(
183183
cmd.get_cursor, self._target._read_preference_for(session),
184-
session)
184+
session, pin=self._client._should_pin_cursor(session))
185185

186186
def _create_cursor(self):
187187
with self._client._tmp_session(self._session, close=False) as s:

pymongo/client_session.py

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
from bson.timestamp import Timestamp
108108

109109
from pymongo import monotonic
110+
from pymongo.cursor import _SocketManager
110111
from pymongo.errors import (ConfigurationError,
111112
ConnectionFailure,
112113
InvalidOperation,
@@ -116,6 +117,7 @@
116117
from pymongo.helpers import _RETRYABLE_ERROR_CODES
117118
from pymongo.read_concern import ReadConcern
118119
from pymongo.read_preferences import ReadPreference, _ServerMode
120+
from pymongo.server_type import SERVER_TYPE
119121
from pymongo.write_concern import WriteConcern
120122

121123

@@ -291,6 +293,7 @@ def __init__(self, opts):
291293
self.state = _TxnState.NONE
292294
self.sharded = False
293295
self.pinned_address = None
296+
self.sock_mgr = None
294297
self.recovery_token = None
295298
self.attempt = 0
296299

@@ -300,10 +303,29 @@ def active(self):
300303
def starting(self):
301304
return self.state == _TxnState.STARTING
302305

306+
@property
307+
def pinned_conn(self):
308+
if self.active() and self.sock_mgr:
309+
return self.sock_mgr.sock
310+
return None
311+
312+
def pin(self, server, sock_info):
313+
self.sharded = True
314+
self.pinned_address = server.description.address
315+
if server.description.server_type == SERVER_TYPE.LoadBalancer:
316+
sock_info.pinned = True
317+
self.sock_mgr = _SocketManager(sock_info, False)
318+
319+
def unpin(self):
320+
self.pinned_address = None
321+
if self.sock_mgr:
322+
self.sock_mgr.close()
323+
self.sock_mgr = None
324+
303325
def reset(self):
326+
self.unpin()
304327
self.state = _TxnState.NONE
305328
self.sharded = False
306-
self.pinned_address = None
307329
self.recovery_token = None
308330
self.attempt = 0
309331

@@ -374,6 +396,9 @@ def _end_session(self, lock):
374396
try:
375397
if self.in_transaction:
376398
self.abort_transaction()
399+
# It's possible we're still pinned here when the transaction
400+
# is in the committed state when the session is discarded.
401+
self._unpin()
377402
finally:
378403
self._client._return_server_session(self._server_session, lock)
379404
self._server_session = None
@@ -779,14 +804,18 @@ def _pinned_address(self):
779804
return self._transaction.pinned_address
780805
return None
781806

782-
def _pin(self, server):
783-
"""Pin this session to the given Server."""
784-
self._transaction.sharded = True
785-
self._transaction.pinned_address = server.description.address
807+
@property
808+
def _pinned_connection(self):
809+
"""The connection this transaction was started on."""
810+
return self._transaction.pinned_conn
811+
812+
def _pin(self, server, sock_info):
813+
"""Pin this session to the given Server or to the given connection."""
814+
self._transaction.pin(server, sock_info)
786815

787816
def _unpin(self):
788817
"""Unpin this session from any pinned Server."""
789-
self._transaction.pinned_address = None
818+
self._transaction.unpin()
790819

791820
def _txn_read_preference(self):
792821
"""Return read preference of this transaction or None."""
@@ -800,9 +829,6 @@ def _apply_to(self, command, is_retryable, read_preference):
800829
self._server_session.last_use = monotonic.time()
801830
command['lsid'] = self._server_session.session_id
802831

803-
if not self.in_transaction:
804-
self._transaction.reset()
805-
806832
if is_retryable:
807833
command['txnNumber'] = self._server_session.transaction_id
808834
return

pymongo/collection.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,8 @@ def _legacy_write(self, sock_info, name, cmd, op_id,
527527
if publish:
528528
duration = datetime.datetime.now() - start
529529
listeners.publish_command_start(
530-
cmd, self.__database.name, rqst_id, sock_info.address, op_id)
530+
cmd, self.__database.name, rqst_id, sock_info.address, op_id,
531+
sock_info.service_id)
531532
start = datetime.datetime.now()
532533
try:
533534
result = sock_info.legacy_write(rqst_id, msg, max_size, False)
@@ -541,12 +542,14 @@ def _legacy_write(self, sock_info, name, cmd, op_id,
541542
reply = message._convert_write_result(
542543
name, cmd, details)
543544
listeners.publish_command_success(
544-
dur, reply, name, rqst_id, sock_info.address, op_id)
545+
dur, reply, name, rqst_id, sock_info.address,
546+
op_id, sock_info.service_id)
545547
raise
546548
else:
547549
details = message._convert_exception(exc)
548550
listeners.publish_command_failure(
549-
dur, details, name, rqst_id, sock_info.address, op_id)
551+
dur, details, name, rqst_id, sock_info.address, op_id,
552+
sock_info.service_id)
550553
raise
551554
if publish:
552555
if result is not None:
@@ -556,7 +559,8 @@ def _legacy_write(self, sock_info, name, cmd, op_id,
556559
reply = {'ok': 1}
557560
duration = (datetime.datetime.now() - start) + duration
558561
listeners.publish_command_success(
559-
duration, reply, name, rqst_id, sock_info.address, op_id)
562+
duration, reply, name, rqst_id, sock_info.address, op_id,
563+
sock_info.service_id)
560564
return result
561565

562566
def _insert_one(
@@ -2310,9 +2314,9 @@ def _cmd(session, server, sock_info, slave_ok):
23102314
if exc.code != 26:
23112315
raise
23122316
cursor = {'id': 0, 'firstBatch': []}
2313-
return CommandCursor(coll, cursor, sock_info.address,
2314-
session=s,
2315-
explicit_session=session is not None)
2317+
cmd_cursor = CommandCursor(
2318+
coll, cursor, sock_info.address, session=s,
2319+
explicit_session=session is not None)
23162320
else:
23172321
res = message._first_batch(
23182322
sock_info, self.__database.name, "system.indexes",
@@ -2322,10 +2326,13 @@ def _cmd(session, server, sock_info, slave_ok):
23222326
cursor = res["cursor"]
23232327
# Note that a collection can only have 64 indexes, so there
23242328
# will never be a getMore call.
2325-
return CommandCursor(coll, cursor, sock_info.address)
2329+
cmd_cursor = CommandCursor(coll, cursor, sock_info.address)
2330+
cmd_cursor._maybe_pin_connection(sock_info)
2331+
return cmd_cursor
23262332

23272333
return self.__database.client._retryable_read(
2328-
_cmd, read_pref, session)
2334+
_cmd, read_pref, session,
2335+
pin=self.__database.client._should_pin_cursor(session))
23292336

23302337
def index_information(self, session=None):
23312338
"""Get information on this collection's indexes.
@@ -2416,7 +2423,8 @@ def _aggregate(self, aggregation_command, pipeline, cursor_class, session,
24162423
user_fields={'cursor': {'firstBatch': 1}}, use_cursor=use_cursor)
24172424
return self.__database.client._retryable_read(
24182425
cmd.get_cursor, cmd.get_read_preference(session), session,
2419-
retryable=not cmd._performs_write)
2426+
retryable=not cmd._performs_write,
2427+
pin=self.database.client._should_pin_cursor(session))
24202428

24212429
def aggregate(self, pipeline, session=None, **kwargs):
24222430
"""Perform an aggregation using the aggregation framework on this

pymongo/command_cursor.py

Lines changed: 44 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818

1919
from bson import _convert_raw_document_lists_to_streams
2020
from bson.py3compat import integer_types
21+
from pymongo.cursor import _SocketManager, _CURSOR_CLOSED_ERRORS
2122
from pymongo.errors import (ConnectionFailure,
2223
InvalidOperation,
23-
NotMasterError,
2424
OperationFailure)
2525
from pymongo.message import (_CursorAddress,
2626
_GetMore,
2727
_RawBatchGetMore)
28+
from pymongo.response import PinnedResponse
2829

2930

3031
class CommandCursor(object):
@@ -38,6 +39,7 @@ def __init__(self, collection, cursor_info, address, retrieved=0,
3839
3940
The parameter 'retrieved' is unused.
4041
"""
42+
self.__sock_mgr = None
4143
self.__collection = collection
4244
self.__id = cursor_info['id']
4345
self.__data = deque(cursor_info['firstBatch'])
@@ -76,11 +78,15 @@ def __die(self, synchronous=False):
7678
self.__address, self.__collection.full_name)
7779
if synchronous:
7880
self.__collection.database.client._close_cursor_now(
79-
self.__id, address, session=self.__session)
81+
self.__id, address, session=self.__session,
82+
sock_mgr=self.__sock_mgr)
8083
else:
8184
# The cursor will be closed later in a different session.
8285
self.__collection.database.client._close_cursor(
8386
self.__id, address)
87+
if self.__sock_mgr:
88+
self.__sock_mgr.close()
89+
self.__sock_mgr = None
8490
self.__end_session(synchronous)
8591

8692
def __end_session(self, synchronous):
@@ -128,52 +134,58 @@ def _post_batch_resume_token(self):
128134
changeStream aggregate or getMore."""
129135
return self.__postbatchresumetoken
130136

137+
def _maybe_pin_connection(self, sock_info):
138+
client = self.__collection.database.client
139+
if not client._should_pin_cursor(self.__session):
140+
return
141+
if not self.__sock_mgr:
142+
sock_mgr = _SocketManager(sock_info, False)
143+
# Ensure the connection gets returned when the entire result is
144+
# returned in the first batch.
145+
if self.__id == 0:
146+
sock_mgr.close()
147+
else:
148+
self.__sock_mgr = sock_mgr
149+
131150
def __send_message(self, operation):
132151
"""Send a getmore message and handle the response.
133152
"""
134-
def kill():
135-
self.__killed = True
136-
self.__end_session(True)
137-
138153
client = self.__collection.database.client
139154
try:
140-
response = client._run_operation_with_response(
155+
response = client._run_operation(
141156
operation, self._unpack_response, address=self.__address)
142-
except OperationFailure:
143-
kill()
144-
raise
145-
except NotMasterError:
146-
# Don't send kill cursors to another server after a "not master"
147-
# error. It's completely pointless.
148-
kill()
157+
except OperationFailure as exc:
158+
if exc.code in _CURSOR_CLOSED_ERRORS:
159+
# Don't send killCursors because the cursor is already closed.
160+
self.__killed = True
161+
# Return the session and pinned connection, if necessary.
162+
self.close()
149163
raise
150164
except ConnectionFailure:
151-
# Don't try to send kill cursors on another socket
152-
# or to another server. It can cause a _pinValue
153-
# assertion on some server releases if we get here
154-
# due to a socket timeout.
155-
kill()
165+
# Don't send killCursors because the cursor is already closed.
166+
self.__killed = True
167+
# Return the session and pinned connection, if necessary.
168+
self.close()
156169
raise
157170
except Exception:
158-
# Close the cursor
159-
self.__die()
171+
self.close()
160172
raise
161173

162-
from_command = response.from_command
163-
reply = response.data
164-
docs = response.docs
165-
166-
if from_command:
167-
cursor = docs[0]['cursor']
174+
if isinstance(response, PinnedResponse):
175+
if not self.__sock_mgr:
176+
self.__sock_mgr = _SocketManager(response.socket_info,
177+
response.more_to_come)
178+
if response.from_command:
179+
cursor = response.docs[0]['cursor']
168180
documents = cursor['nextBatch']
169181
self.__postbatchresumetoken = cursor.get('postBatchResumeToken')
170182
self.__id = cursor['id']
171183
else:
172-
documents = docs
173-
self.__id = reply.cursor_id
184+
documents = response.docs
185+
self.__id = response.data.cursor_id
174186

175187
if self.__id == 0:
176-
kill()
188+
self.__die(True)
177189
self.__data = deque(documents)
178190

179191
def _unpack_response(self, response, cursor_id, codec_options,
@@ -204,10 +216,9 @@ def _refresh(self):
204216
self.__session,
205217
self.__collection.database.client,
206218
self.__max_await_time_ms,
207-
False))
219+
self.__sock_mgr, False))
208220
else: # Cursor id is zero nothing else to return
209-
self.__killed = True
210-
self.__end_session(True)
221+
self.__die(True)
211222

212223
return len(self.__data)
213224

0 commit comments

Comments
 (0)