Skip to content

Commit 611c3f8

Browse files
committed
PYTHON-1677 Connections survive primary stepdown
1 parent 5730284 commit 611c3f8

13 files changed

+357
-114
lines changed

doc/changelog.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ Version 3.9 adds support for MongoDB 4.2. Highlights include:
6565
the buffer protocol.
6666
- Resume tokens can now be accessed from a ``ChangeStream`` cursor using the
6767
:attr:`~pymongo.change_stream.ChangeStream.resume_token` attribute.
68+
- Connections now survive primary step-down. Applications should expect less
69+
socket connection turnover during replica set elections.
6870

6971
.. _URI options specification: https://github.com/mongodb/specifications/blob/master/source/uri-options/uri-options.rst
7072

pymongo/helpers.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,21 @@
2929
WriteConcernError,
3030
WTimeoutError)
3131

32-
# From the Server Discovery and Monitoring spec, the "not master" error codes
33-
# are combined with the "node is recovering" error codes.
32+
# From the SDAM spec, the "node is shutting down" codes.
33+
_SHUTDOWN_CODES = frozenset([
34+
11600, # InterruptedAtShutdown
35+
91, # ShutdownInProgress
36+
])
37+
# From the SDAM spec, the "not master" error codes are combined with the
38+
# "node is recovering" error codes (of which the "node is shutting down"
39+
# errors are a subset).
3440
_NOT_MASTER_CODES = frozenset([
3541
10107, # NotMaster
3642
13435, # NotMasterNoSlaveOk
37-
11600, # InterruptedAtShutdown
3843
11602, # InterruptedDueToReplStateChange
3944
13436, # NotMasterOrSecondary
4045
189, # PrimarySteppedDown
41-
91, # ShutdownInProgress
42-
])
46+
]) | _SHUTDOWN_CODES
4347
# From the retryable writes spec.
4448
_RETRYABLE_ERROR_CODES = _NOT_MASTER_CODES | frozenset([
4549
7, # HostNotFound

pymongo/mongo_client.py

Lines changed: 76 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,9 +1195,11 @@ def _get_topology(self):
11951195

11961196
@contextlib.contextmanager
11971197
def _get_socket(self, server, session, exhaust=False):
1198-
with self._reset_on_error(server.description.address, session):
1199-
with server.get_socket(self.__all_credentials,
1200-
checkout=exhaust) as sock_info:
1198+
with _MongoClientErrorHandler(
1199+
self, server.description.address, session) as err_handler:
1200+
with server.get_socket(
1201+
self.__all_credentials, checkout=exhaust) as sock_info:
1202+
err_handler.contribute_socket(sock_info)
12011203
yield sock_info
12021204

12031205
def _select_server(self, server_selector, session, address=None):
@@ -1289,8 +1291,10 @@ def _run_operation_with_response(self, operation, unpack_res,
12891291
server = self._select_server(
12901292
operation.read_preference, operation.session, address=address)
12911293

1292-
with self._reset_on_error(server.description.address,
1293-
operation.session):
1294+
with _MongoClientErrorHandler(
1295+
self, server.description.address,
1296+
operation.session) as err_handler:
1297+
err_handler.contribute_socket(operation.exhaust_mgr.sock)
12941298
return server.run_operation_with_response(
12951299
operation.exhaust_mgr.sock,
12961300
operation,
@@ -1314,49 +1318,6 @@ def _cmd(session, server, sock_info, slave_ok):
13141318
retryable=isinstance(operation, message._Query),
13151319
exhaust=exhaust)
13161320

1317-
@contextlib.contextmanager
1318-
def _reset_on_error(self, server_address, session):
1319-
"""On "not master" or "node is recovering" errors reset the server
1320-
according to the SDAM spec.
1321-
1322-
Unpin the session on transient transaction errors.
1323-
"""
1324-
try:
1325-
try:
1326-
yield
1327-
except PyMongoError as exc:
1328-
if session and exc.has_error_label(
1329-
"TransientTransactionError"):
1330-
session._unpin_mongos()
1331-
raise
1332-
except NetworkTimeout:
1333-
# The socket has been closed. Don't reset the server.
1334-
# Server Discovery And Monitoring Spec: "When an application
1335-
# operation fails because of any network error besides a socket
1336-
# timeout...."
1337-
if session:
1338-
session._server_session.mark_dirty()
1339-
raise
1340-
except NotMasterError:
1341-
# "When the client sees a "not master" error it MUST replace the
1342-
# server's description with type Unknown. It MUST request an
1343-
# immediate check of the server."
1344-
self._reset_server_and_request_check(server_address)
1345-
raise
1346-
except ConnectionFailure:
1347-
# "Client MUST replace the server's description with type Unknown
1348-
# ... MUST NOT request an immediate check of the server."
1349-
self.__reset_server(server_address)
1350-
if session:
1351-
session._server_session.mark_dirty()
1352-
raise
1353-
except OperationFailure as exc:
1354-
if exc.code in helpers._RETRYABLE_ERROR_CODES:
1355-
# Do not request an immediate check since the server is likely
1356-
# shutting down.
1357-
self.__reset_server(server_address)
1358-
raise
1359-
13601321
def _retry_with_session(self, retryable, func, session, bulk):
13611322
"""Execute an operation with at most one consecutive retries
13621323
@@ -1494,7 +1455,7 @@ def _retryable_write(self, retryable, func, session):
14941455
with self._tmp_session(session) as s:
14951456
return self._retry_with_session(retryable, func, s, None)
14961457

1497-
def __reset_server(self, address):
1458+
def _reset_server(self, address):
14981459
"""Clear our connection pool for a server and mark it Unknown."""
14991460
self._topology.reset_server(address)
15001461

@@ -2158,3 +2119,69 @@ def __next__(self):
21582119
raise TypeError("'MongoClient' object is not iterable")
21592120

21602121
next = __next__
2122+
2123+
2124+
class _MongoClientErrorHandler(object):
2125+
"""Error handler for MongoClient."""
2126+
__slots__ = ('_client', '_server_address', '_session', '_max_wire_version')
2127+
2128+
def __init__(self, client, server_address, session):
2129+
self._client = client
2130+
self._server_address = server_address
2131+
self._session = session
2132+
self._max_wire_version = None
2133+
2134+
def contribute_socket(self, sock_info):
2135+
"""Provide socket information to the error handler."""
2136+
# Currently, we only extract the max_wire_version information.
2137+
self._max_wire_version = sock_info.max_wire_version
2138+
2139+
def __enter__(self):
2140+
return self
2141+
2142+
def __exit__(self, exc_type, exc_val, exc_tb):
2143+
if exc_type is None:
2144+
return
2145+
2146+
if issubclass(exc_type, PyMongoError):
2147+
if self._session and exc_val.has_error_label(
2148+
"TransientTransactionError"):
2149+
self._session._unpin_mongos()
2150+
2151+
if issubclass(exc_type, NetworkTimeout):
2152+
# The socket has been closed. Don't reset the server.
2153+
# Server Discovery And Monitoring Spec: "When an application
2154+
# operation fails because of any network error besides a socket
2155+
# timeout...."
2156+
if self._session:
2157+
self._session._server_session.mark_dirty()
2158+
elif issubclass(exc_type, NotMasterError):
2159+
# As per the SDAM spec if:
2160+
# - the server sees a "not master" error, and
2161+
# - the server is not shutting down, and
2162+
# - the server version is >= 4.2, then
2163+
# we keep the existing connection pool, but mark the server type
2164+
# as Unknown and request an immediate check of the server.
2165+
# Otherwise, we clear the connection pool, mark the server as
2166+
# Unknown and request an immediate check of the server.
2167+
err_code = exc_val.details.get('code', -1)
2168+
is_shutting_down = err_code in helpers._SHUTDOWN_CODES
2169+
if (is_shutting_down or (self._max_wire_version is None) or
2170+
(self._max_wire_version <= 7)):
2171+
# Clear the pool, mark server Unknown and request check.
2172+
self._client._reset_server_and_request_check(
2173+
self._server_address)
2174+
else:
2175+
self._client._topology.mark_server_unknown_and_request_check(
2176+
self._server_address)
2177+
elif issubclass(exc_type, ConnectionFailure):
2178+
# "Client MUST replace the server's description with type Unknown
2179+
# ... MUST NOT request an immediate check of the server."
2180+
self._client._reset_server(self._server_address)
2181+
if self._session:
2182+
self._session._server_session.mark_dirty()
2183+
elif issubclass(exc_type, OperationFailure):
2184+
# Do not request an immediate check since the server is likely
2185+
# shutting down.
2186+
if exc_val.code in helpers._RETRYABLE_ERROR_CODES:
2187+
self._client._reset_server(self._server_address)

pymongo/pool.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,8 @@ def _raise_if_not_writable(self, unacknowledged):
651651
"""
652652
if unacknowledged and not self.is_writable:
653653
# Write won't succeed, bail as if we'd received a not master error.
654-
raise NotMasterError("not master")
654+
raise NotMasterError("not master", {
655+
"ok": 0, "errmsg": "not master", "code": 10107})
655656

656657
def legacy_write(self, request_id, msg, max_doc_size, with_last_error):
657658
"""Send OP_INSERT, etc., optionally returning response as a dict.
@@ -768,6 +769,9 @@ def send_cluster_time(self, command, session, client):
768769
def update_last_checkin_time(self):
769770
self.last_checkin_time = _time()
770771

772+
def update_is_writable(self, is_writable):
773+
self.is_writable = is_writable
774+
771775
def idle_time_seconds(self):
772776
"""Seconds since this socket was last checked into its pool."""
773777
return _time() - self.last_checkin_time
@@ -958,6 +962,8 @@ def __init__(self, address, options, handshake=True):
958962
# Monotonically increasing connection ID required for CMAP Events.
959963
self.next_connection_id = 1
960964
self.closed = False
965+
# Track whether the sockets in this pool are writeable or not.
966+
self.is_writable = None
961967

962968
# Keep track of resets, so we notice sockets created before the most
963969
# recent reset and close them.
@@ -1012,6 +1018,15 @@ def _reset(self, close):
10121018
for sock_info in sockets:
10131019
sock_info.close_socket(ConnectionClosedReason.STALE)
10141020

1021+
def update_is_writable(self, is_writable):
1022+
"""Updates the is_writable attribute on all sockets currently in the
1023+
Pool.
1024+
"""
1025+
self.is_writable = is_writable
1026+
with self.lock:
1027+
for socket in self.sockets:
1028+
socket.update_is_writable(self.is_writable)
1029+
10151030
def reset(self):
10161031
self._reset(close=False)
10171032

@@ -1075,6 +1090,7 @@ def connect(self):
10751090
sock_info = SocketInfo(sock, self, self.address, conn_id)
10761091
if self.handshake:
10771092
sock_info.ismaster(self.opts.metadata, None)
1093+
self.is_writable = sock_info.is_writable
10781094

10791095
return sock_info
10801096

@@ -1194,6 +1210,7 @@ def return_socket(self, sock_info, publish_checkin=True):
11941210
sock_info.close_socket(ConnectionClosedReason.STALE)
11951211
elif not sock_info.closed:
11961212
sock_info.update_last_checkin_time()
1213+
sock_info.update_is_writable(self.is_writable)
11971214
with self.lock:
11981215
self.sockets.appendleft(sock_info)
11991216

pymongo/topology.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,18 @@ def reset_server(self, address):
409409
Do *not* request an immediate check.
410410
"""
411411
with self._lock:
412-
self._reset_server(address)
412+
self._reset_server(address, reset_pool=True)
413413

414414
def reset_server_and_request_check(self, address):
415415
"""Clear our pool for a server, mark it Unknown, and check it soon."""
416416
with self._lock:
417-
self._reset_server(address)
417+
self._reset_server(address, reset_pool=True)
418+
self._request_check(address)
419+
420+
def mark_server_unknown_and_request_check(self, address):
421+
"""Mark a server Unknown, and check it soon."""
422+
with self._lock:
423+
self._reset_server(address, reset_pool=False)
418424
self._request_check(address)
419425

420426
def update_pool(self):
@@ -523,16 +529,17 @@ def _ensure_opened(self):
523529
for server in itervalues(self._servers):
524530
server.open()
525531

526-
def _reset_server(self, address):
527-
"""Clear our pool for a server and mark it Unknown.
532+
def _reset_server(self, address, reset_pool):
533+
"""Mark a server Unknown and optionally reset it's pool.
528534
529535
Hold the lock when calling this. Does *not* request an immediate check.
530536
"""
531537
server = self._servers.get(address)
532538

533539
# "server" is None if another thread removed it from the topology.
534540
if server:
535-
server.reset()
541+
if reset_pool:
542+
server.reset()
536543

537544
# Mark this server Unknown.
538545
self._description = self._description.reset_server(address)
@@ -578,7 +585,14 @@ def _update_servers(self):
578585
self._servers[address] = server
579586
server.open()
580587
else:
588+
# Cache old is_writable value.
589+
was_writable = self._servers[address].description.is_writable
590+
# Update server description.
581591
self._servers[address].description = sd
592+
# Update is_writable value of the pool, if it changed.
593+
if was_writable != sd.is_writable:
594+
self._servers[address].pool.update_is_writable(
595+
sd.is_writable)
582596

583597
for address, server in list(self._servers.items()):
584598
if not self._description.has_server(address):

test/__init__.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ def __init__(self):
159159
"""Create a client and grab essential information from the server."""
160160
self.connection_attempts = []
161161
self.connected = False
162-
self.ismaster = {}
163162
self.w = None
164163
self.nodes = set()
165164
self.replica_set_name = None
@@ -184,6 +183,10 @@ def __init__(self):
184183
if COMPRESSORS:
185184
self.default_client_options["compressors"] = COMPRESSORS
186185

186+
@property
187+
def ismaster(self):
188+
return self.client.admin.command('isMaster')
189+
187190
def _connect(self, host, port, **kwargs):
188191
# Jython takes a long time to connect.
189192
if sys.platform.startswith('java'):
@@ -253,7 +256,7 @@ def _init_client(self):
253256
self.cmd_line = self.client.admin.command('getCmdLineOpts')
254257

255258
self.server_status = self.client.admin.command('serverStatus')
256-
self.ismaster = ismaster = self.client.admin.command('isMaster')
259+
ismaster = self.ismaster
257260
self.sessions_enabled = 'logicalSessionTimeoutMinutes' in ismaster
258261

259262
if 'setName' in ismaster:
@@ -276,18 +279,17 @@ def _init_client(self):
276279
**self.default_client_options)
277280

278281
# Get the authoritative ismaster result from the primary.
279-
self.ismaster = self.client.admin.command('ismaster')
282+
ismaster = self.ismaster
280283
nodes = [partition_node(node.lower())
281-
for node in self.ismaster.get('hosts', [])]
284+
for node in ismaster.get('hosts', [])]
282285
nodes.extend([partition_node(node.lower())
283-
for node in self.ismaster.get('passives', [])])
286+
for node in ismaster.get('passives', [])])
284287
nodes.extend([partition_node(node.lower())
285-
for node in self.ismaster.get('arbiters', [])])
288+
for node in ismaster.get('arbiters', [])])
286289
self.nodes = set(nodes)
287290
else:
288-
self.ismaster = ismaster
289291
self.nodes = set([(host, port)])
290-
self.w = len(self.ismaster.get("hosts", [])) or 1
292+
self.w = len(ismaster.get("hosts", [])) or 1
291293
self.version = Version.from_client(self.client)
292294

293295
if 'enableTestCommands=1' in self.cmd_line['argv']:

0 commit comments

Comments
 (0)