Skip to content

Commit e1731f7

Browse files
committed
PYTHON-2677 Better wait queue timeout errors for load balanced clusters (#639)
Remove checkout argument in favor of SocketInfo.pin_txn/pin_cursor() (cherry picked from commit 4c77d7c)
1 parent e31a981 commit e1731f7

15 files changed

+243
-62
lines changed

pymongo/change_stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def _run_aggregation_cmd(self, session, explicit_session):
181181

182182
return self._client._retryable_read(
183183
cmd.get_cursor, self._target._read_preference_for(session),
184-
session, pin=self._client._should_pin_cursor(session))
184+
session)
185185

186186
def _create_cursor(self):
187187
with self._client._tmp_session(self._session, close=False) as s:

pymongo/client_session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ def pin(self, server, sock_info):
313313
self.sharded = True
314314
self.pinned_address = server.description.address
315315
if server.description.server_type == SERVER_TYPE.LoadBalancer:
316-
sock_info.pinned = True
316+
sock_info.pin_txn()
317317
self.sock_mgr = _SocketManager(sock_info, False)
318318

319319
def unpin(self):

pymongo/collection.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2331,8 +2331,7 @@ def _cmd(session, server, sock_info, slave_ok):
23312331
return cmd_cursor
23322332

23332333
return self.__database.client._retryable_read(
2334-
_cmd, read_pref, session,
2335-
pin=self.__database.client._should_pin_cursor(session))
2334+
_cmd, read_pref, session)
23362335

23372336
def index_information(self, session=None):
23382337
"""Get information on this collection's indexes.
@@ -2423,8 +2422,7 @@ def _aggregate(self, aggregation_command, pipeline, cursor_class, session,
24232422
user_fields={'cursor': {'firstBatch': 1}}, use_cursor=use_cursor)
24242423
return self.__database.client._retryable_read(
24252424
cmd.get_cursor, cmd.get_read_preference(session), session,
2426-
retryable=not cmd._performs_write,
2427-
pin=self.database.client._should_pin_cursor(session))
2425+
retryable=not cmd._performs_write)
24282426

24292427
def aggregate(self, pipeline, session=None, **kwargs):
24302428
"""Perform an aggregation using the aggregation framework on this

pymongo/command_cursor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ def _maybe_pin_connection(self, sock_info):
139139
if not client._should_pin_cursor(self.__session):
140140
return
141141
if not self.__sock_mgr:
142+
sock_info.pin_cursor()
142143
sock_mgr = _SocketManager(sock_info, False)
143144
# Ensure the connection gets returned when the entire result is
144145
# returned in the first batch.

pymongo/database.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -525,8 +525,7 @@ def aggregate(self, pipeline, session=None, **kwargs):
525525
user_fields={'cursor': {'firstBatch': 1}})
526526
return self.client._retryable_read(
527527
cmd.get_cursor, cmd.get_read_preference(s), s,
528-
retryable=not cmd._performs_write,
529-
pin=self.client._should_pin_cursor(s))
528+
retryable=not cmd._performs_write)
530529

531530
def watch(self, pipeline=None, full_document=None, resume_after=None,
532531
max_await_time_ms=None, batch_size=None, collation=None,
@@ -839,8 +838,7 @@ def _cmd(session, server, sock_info, slave_okay):
839838
**kwargs)
840839

841840
return self.__client._retryable_read(
842-
_cmd, read_pref, session,
843-
pin=self.client._should_pin_cursor(session))
841+
_cmd, read_pref, session)
844842

845843
def list_collection_names(self, session=None, filter=None, **kwargs):
846844
"""Get a list of all the collection names in this database.

pymongo/mongo_client.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,16 +1291,15 @@ def _get_topology(self):
12911291
return self._topology
12921292

12931293
@contextlib.contextmanager
1294-
def _get_socket(self, server, session, pin=False):
1294+
def _get_socket(self, server, session):
12951295
in_txn = session and session.in_transaction
12961296
with _MongoClientErrorHandler(self, server, session) as err_handler:
12971297
# Reuse the pinned connection, if it exists.
12981298
if in_txn and session._pinned_connection:
12991299
yield session._pinned_connection
13001300
return
13011301
with server.get_socket(
1302-
self.__all_credentials, checkout=pin,
1303-
handler=err_handler) as sock_info:
1302+
self.__all_credentials, handler=err_handler) as sock_info:
13041303
# Pin this session to the selected server or connection.
13051304
if (in_txn and server.description.server_type in (
13061305
SERVER_TYPE.Mongos, SERVER_TYPE.LoadBalancer)):
@@ -1351,7 +1350,7 @@ def _socket_for_writes(self, session):
13511350
return self._get_socket(server, session)
13521351

13531352
@contextlib.contextmanager
1354-
def _slaveok_for_server(self, read_preference, server, session, pin=False):
1353+
def _slaveok_for_server(self, read_preference, server, session):
13551354
assert read_preference is not None, "read_preference must not be None"
13561355
# Get a socket for a server matching the read preference, and yield
13571356
# sock_info, slave_ok. Server Selection Spec: "slaveOK must be sent to
@@ -1362,7 +1361,7 @@ def _slaveok_for_server(self, read_preference, server, session, pin=False):
13621361
topology = self._get_topology()
13631362
single = topology.description.topology_type == TOPOLOGY_TYPE.Single
13641363

1365-
with self._get_socket(server, session, pin=pin) as sock_info:
1364+
with self._get_socket(server, session) as sock_info:
13661365
slave_ok = (single and not sock_info.is_mongos) or (
13671366
read_preference != ReadPreference.PRIMARY)
13681367
yield sock_info, slave_ok
@@ -1389,7 +1388,7 @@ def _should_pin_cursor(self, session):
13891388
return (self.__options.load_balanced and
13901389
not (session and session.in_transaction))
13911390

1392-
def _run_operation(self, operation, unpack_res, pin=False, address=None):
1391+
def _run_operation(self, operation, unpack_res, address=None):
13931392
"""Run a _Query/_GetMore operation and return a Response.
13941393
13951394
:Parameters:
@@ -1398,7 +1397,6 @@ def _run_operation(self, operation, unpack_res, pin=False, address=None):
13981397
- `address` (optional): Optional address when sending a message
13991398
to a specific server, used for getMore.
14001399
"""
1401-
pin = self._should_pin_cursor(operation.session) or operation.exhaust
14021400
if operation.sock_mgr:
14031401
server = self._select_server(
14041402
operation.read_preference, operation.session, address=address)
@@ -1409,17 +1407,16 @@ def _run_operation(self, operation, unpack_res, pin=False, address=None):
14091407
err_handler.contribute_socket(operation.sock_mgr.sock)
14101408
return server.run_operation(
14111409
operation.sock_mgr.sock, operation, True,
1412-
self._event_listeners, pin, unpack_res)
1410+
self._event_listeners, unpack_res)
14131411

14141412
def _cmd(session, server, sock_info, slave_ok):
14151413
return server.run_operation(
1416-
sock_info, operation, slave_ok, self._event_listeners, pin,
1414+
sock_info, operation, slave_ok, self._event_listeners,
14171415
unpack_res)
14181416

14191417
return self._retryable_read(
14201418
_cmd, operation.read_preference, operation.session,
1421-
address=address, retryable=isinstance(operation, message._Query),
1422-
pin=pin)
1419+
address=address, retryable=isinstance(operation, message._Query))
14231420

14241421
def _retry_with_session(self, retryable, func, session, bulk):
14251422
"""Execute an operation with at most one consecutive retries
@@ -1491,7 +1488,7 @@ def is_retrying():
14911488
last_error = exc
14921489

14931490
def _retryable_read(self, func, read_pref, session, address=None,
1494-
retryable=True, pin=False):
1491+
retryable=True):
14951492
"""Execute an operation with at most one consecutive retries
14961493
14971494
Returns func()'s return value on success. On error retries the same
@@ -1511,8 +1508,7 @@ def _retryable_read(self, func, read_pref, session, address=None,
15111508
read_pref, session, address=address)
15121509
if not server.description.retryable_reads_supported:
15131510
retryable = False
1514-
with self._slaveok_for_server(
1515-
read_pref, server, session, pin=pin) as (
1511+
with self._slaveok_for_server(read_pref, server, session) as (
15161512
sock_info, slave_ok):
15171513
if retrying and not retryable:
15181514
# A retry is not possible because this server does

pymongo/pool.py

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -566,10 +566,19 @@ def __init__(self, sock, pool, address, id):
566566
self.service_id = None
567567
# When executing a transaction in load balancing mode, this flag is
568568
# set to true to indicate that the session now owns the connection.
569-
self.pinned = False
569+
self.pinned_txn = False
570+
self.pinned_cursor = False
571+
self.active = False
572+
573+
def pin_txn(self):
574+
self.pinned_txn = True
575+
assert not self.pinned_cursor
576+
577+
def pin_cursor(self):
578+
self.pinned_cursor = True
579+
assert not self.pinned_txn
570580

571581
def unpin(self):
572-
self.pinned = False
573582
pool = self.pool_ref()
574583
if pool:
575584
pool.return_socket(self)
@@ -1190,6 +1199,8 @@ def __init__(self, address, options, handshake=True):
11901199
# from thinking that a cursor's pinned connection can be GC'd when the
11911200
# cursor is GC'd (see PYTHON-2751).
11921201
self.__pinned_sockets = set()
1202+
self.ncursors = 0
1203+
self.ntxns = 0
11931204

11941205
def _reset(self, close, service_id=None):
11951206
with self.lock:
@@ -1327,15 +1338,15 @@ def connect(self, all_credentials=None):
13271338
return sock_info
13281339

13291340
@contextlib.contextmanager
1330-
def get_socket(self, all_credentials, checkout=False, handler=None):
1341+
def get_socket(self, all_credentials, handler=None):
13311342
"""Get a socket from the pool. Use with a "with" statement.
13321343
13331344
Returns a :class:`SocketInfo` object wrapping a connected
13341345
:class:`socket.socket`.
13351346
13361347
This method should always be used in a with-statement::
13371348
1338-
with pool.get_socket(credentials, checkout) as socket_info:
1349+
with pool.get_socket(credentials) as socket_info:
13391350
socket_info.send_message(msg)
13401351
data = socket_info.receive_message(op_code, request_id)
13411352
@@ -1347,17 +1358,13 @@ def get_socket(self, all_credentials, checkout=False, handler=None):
13471358
13481359
:Parameters:
13491360
- `all_credentials`: dict, maps auth source to MongoCredential.
1350-
- `checkout` (optional): keep socket checked out.
13511361
- `handler` (optional): A _MongoClientErrorHandler.
13521362
"""
13531363
listeners = self.opts.event_listeners
13541364
if self.enabled_for_cmap:
13551365
listeners.publish_connection_check_out_started(self.address)
13561366

13571367
sock_info = self._get_socket(all_credentials)
1358-
if checkout:
1359-
self.__pinned_sockets.add(sock_info)
1360-
13611368
if self.enabled_for_cmap:
13621369
listeners.publish_connection_checked_out(
13631370
self.address, sock_info.id)
@@ -1368,20 +1375,25 @@ def get_socket(self, all_credentials, checkout=False, handler=None):
13681375
# Note that when pinned is True, the session owns the
13691376
# connection and it is responsible for checking the connection
13701377
# back into the pool.
1371-
pinned = sock_info.pinned
1378+
pinned = sock_info.pinned_txn or sock_info.pinned_cursor
13721379
if handler:
13731380
# Perform SDAM error handling rules while the connection is
13741381
# still checked out.
13751382
exc_type, exc_val, _ = sys.exc_info()
13761383
handler.handle(exc_type, exc_val)
1377-
if not pinned:
1384+
if not pinned and sock_info.active:
13781385
self.return_socket(sock_info)
13791386
raise
1380-
else:
1381-
if sock_info.pinned:
1387+
if sock_info.pinned_txn:
1388+
with self.lock:
13821389
self.__pinned_sockets.add(sock_info)
1383-
elif not checkout:
1384-
self.return_socket(sock_info)
1390+
self.ntxns += 1
1391+
elif sock_info.pinned_cursor:
1392+
with self.lock:
1393+
self.__pinned_sockets.add(sock_info)
1394+
self.ncursors += 1
1395+
elif sock_info.active:
1396+
self.return_socket(sock_info)
13851397

13861398
def _get_socket(self, all_credentials):
13871399
"""Get or create a SocketInfo. Can raise ConnectionFailure."""
@@ -1438,6 +1450,7 @@ def _get_socket(self, all_credentials):
14381450
self.address, ConnectionCheckOutFailedReason.CONN_ERROR)
14391451
raise
14401452

1453+
sock_info.active = True
14411454
return sock_info
14421455

14431456
def return_socket(self, sock_info):
@@ -1446,8 +1459,12 @@ def return_socket(self, sock_info):
14461459
:Parameters:
14471460
- `sock_info`: The socket to check into the pool.
14481461
"""
1462+
txn = sock_info.pinned_txn
1463+
cursor = sock_info.pinned_cursor
1464+
sock_info.active = False
1465+
sock_info.pinned_txn = False
1466+
sock_info.pinned_cursor = False
14491467
self.__pinned_sockets.discard(sock_info)
1450-
sock_info.pinned = False
14511468
listeners = self.opts.event_listeners
14521469
if self.enabled_for_cmap:
14531470
listeners.publish_connection_checked_in(self.address, sock_info.id)
@@ -1464,7 +1481,6 @@ def return_socket(self, sock_info):
14641481
ConnectionClosedReason.ERROR)
14651482
else:
14661483
with self.lock:
1467-
assert sock_info not in self.sockets
14681484
# Hold the lock to ensure this section does not race with
14691485
# Pool.reset().
14701486
if self.stale_generation(sock_info.generation,
@@ -1477,6 +1493,10 @@ def return_socket(self, sock_info):
14771493

14781494
self._socket_semaphore.release()
14791495
with self.lock:
1496+
if txn:
1497+
self.ntxns -= 1
1498+
elif cursor:
1499+
self.ncursors -= 1
14801500
self.active_sockets -= 1
14811501

14821502
def _perished(self, sock_info):
@@ -1518,9 +1538,18 @@ def _raise_wait_queue_timeout(self):
15181538
if self.enabled_for_cmap:
15191539
listeners.publish_connection_check_out_failed(
15201540
self.address, ConnectionCheckOutFailedReason.TIMEOUT)
1541+
if self.opts.load_balanced:
1542+
other_ops = self.active_sockets - self.ncursors - self.ntxns
1543+
raise ConnectionFailure(
1544+
'Timeout waiting for connection from the connection pool. '
1545+
'maxPoolSize: %s, connections in use by cursors: %s, '
1546+
'connections in use by transactions: %s, connections in use '
1547+
'by other operations: %s, wait_queue_timeout: %s' % (
1548+
self.opts.max_pool_size, self.ncursors, self.ntxns,
1549+
other_ops, self.opts.wait_queue_timeout))
15211550
raise ConnectionFailure(
1522-
'Timed out while checking out a connection from connection pool '
1523-
'with max_size %r and wait_queue_timeout %r' % (
1551+
'Timed out while checking out a connection from connection pool. '
1552+
'maxPoolSize: %s, wait_queue_timeout: %s' % (
15241553
self.opts.max_pool_size, self.opts.wait_queue_timeout))
15251554

15261555
def __del__(self):

pymongo/server.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def request_check(self):
6969
self._monitor.request_check()
7070

7171
def run_operation(self, sock_info, operation, set_slave_okay, listeners,
72-
pin, unpack_res):
72+
unpack_res):
7373
"""Run a _Query or _GetMore operation and return a Response object.
7474
7575
This method is used only to run _Query/_GetMore operations from
@@ -81,7 +81,6 @@ def run_operation(self, sock_info, operation, set_slave_okay, listeners,
8181
- `set_slave_okay`: Pass to operation.get_message.
8282
- `all_credentials`: dict, maps auth source to MongoCredential.
8383
- `listeners`: Instance of _EventListeners or None.
84-
- `pin`: If True, then this is a pinned cursor operation.
8584
- `unpack_res`: A callable that decodes the wire protocol response.
8685
"""
8786
duration = None
@@ -170,7 +169,8 @@ def run_operation(self, sock_info, operation, set_slave_okay, listeners,
170169
docs = _decode_all_selective(
171170
decrypted, operation.codec_options, user_fields)
172171

173-
if pin:
172+
if client._should_pin_cursor(operation.session) or operation.exhaust:
173+
sock_info.pin_cursor()
174174
if isinstance(reply, _OpMsg):
175175
# In OP_MSG, the server keeps sending only if the
176176
# more_to_come flag is set.
@@ -200,8 +200,8 @@ def run_operation(self, sock_info, operation, set_slave_okay, listeners,
200200

201201
return response
202202

203-
def get_socket(self, all_credentials, checkout=False, handler=None):
204-
return self.pool.get_socket(all_credentials, checkout, handler)
203+
def get_socket(self, all_credentials, handler=None):
204+
return self.pool.get_socket(all_credentials, handler)
205205

206206
@property
207207
def description(self):

0 commit comments

Comments
 (0)