Skip to content

Commit 7099e1b

Browse files
committed
PYTHON-2199 Reduce race conditions in SDAM error handling
Use Pool.generation and topologyVersion to reduce race conditions SDAM error handling. Implement SDAM error handling spec tests.
1 parent 4c727fd commit 7099e1b

File tree

78 files changed

+9060
-120
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+9060
-120
lines changed

pymongo/database.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
from pymongo.errors import (CollectionInvalid,
3030
ConfigurationError,
3131
InvalidName,
32-
NotMasterError,
3332
OperationFailure)
3433
from pymongo.message import _first_batch
3534
from pymongo.read_preferences import ReadPreference
@@ -1158,8 +1157,7 @@ def error(self):
11581157
# doing so already.
11591158
primary = self.__client.primary
11601159
if primary:
1161-
self.__client._reset_server_and_request_check(
1162-
primary, NotMasterError(error_msg, error))
1160+
self.__client._handle_getlasterror(primary, error_msg)
11631161
return error
11641162

11651163
def last_status(self):

pymongo/ismaster.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,7 @@ def sasl_supported_mechs(self):
168168
169169
"""
170170
return self._doc.get('saslSupportedMechs', [])
171+
172+
@property
173+
def topology_version(self):
174+
return self._doc.get('topologyVersion')

pymongo/mongo_client.py

Lines changed: 21 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,15 @@
5959
ConfigurationError,
6060
ConnectionFailure,
6161
InvalidOperation,
62-
NetworkTimeout,
63-
NotMasterError,
6462
OperationFailure,
6563
PyMongoError,
6664
ServerSelectionTimeoutError)
6765
from pymongo.read_preferences import ReadPreference
6866
from pymongo.server_selectors import (writable_preferred_server_selector,
6967
writable_server_selector)
7068
from pymongo.server_type import SERVER_TYPE
71-
from pymongo.topology import Topology
69+
from pymongo.topology import (Topology,
70+
_ErrorContext)
7271
from pymongo.topology_description import TOPOLOGY_TYPE
7372
from pymongo.settings import TopologySettings
7473
from pymongo.uri_parser import (_handle_option_deprecations,
@@ -1225,8 +1224,7 @@ def _get_topology(self):
12251224

12261225
@contextlib.contextmanager
12271226
def _get_socket(self, server, session, exhaust=False):
1228-
with _MongoClientErrorHandler(
1229-
self, server.description.address, session) as err_handler:
1227+
with _MongoClientErrorHandler(self, server, session) as err_handler:
12301228
with server.get_socket(
12311229
self.__all_credentials, checkout=exhaust) as sock_info:
12321230
err_handler.contribute_socket(sock_info)
@@ -1328,8 +1326,7 @@ def _run_operation_with_response(self, operation, unpack_res,
13281326
operation.read_preference, operation.session, address=address)
13291327

13301328
with _MongoClientErrorHandler(
1331-
self, server.description.address,
1332-
operation.session) as err_handler:
1329+
self, server, operation.session) as err_handler:
13331330
err_handler.contribute_socket(operation.exhaust_mgr.sock)
13341331
return server.run_operation_with_response(
13351332
operation.exhaust_mgr.sock,
@@ -1499,9 +1496,9 @@ def _retryable_write(self, retryable, func, session):
14991496
with self._tmp_session(session) as s:
15001497
return self._retry_with_session(retryable, func, s, None)
15011498

1502-
def _reset_server_and_request_check(self, address, error):
1499+
def _handle_getlasterror(self, address, error_msg):
15031500
"""Clear our pool for a server, mark it Unknown, and check it soon."""
1504-
self._topology.reset_server_and_request_check(address, error)
1501+
self._topology.handle_getlasterror(address, error_msg)
15051502

15061503
def __eq__(self, other):
15071504
if isinstance(other, self.__class__):
@@ -2167,18 +2164,24 @@ def __next__(self):
21672164

21682165
class _MongoClientErrorHandler(object):
21692166
"""Error handler for MongoClient."""
2170-
__slots__ = ('_client', '_server_address', '_session', '_max_wire_version')
2167+
__slots__ = ('_client', '_server_address', '_session',
2168+
'_max_wire_version', '_sock_generation')
21712169

2172-
def __init__(self, client, server_address, session):
2170+
def __init__(self, client, server, session):
21732171
self._client = client
2174-
self._server_address = server_address
2172+
self._server_address = server.description.address
21752173
self._session = session
21762174
self._max_wire_version = common.MIN_WIRE_VERSION
2175+
# XXX: When get_socket fails, this generation could be out of date:
2176+
# "Note that when a network error occurs before the handshake
2177+
# completes then the error's generation number is the generation
2178+
# of the pool at the time the connection attempt was started."
2179+
self._sock_generation = server.pool.generation
21772180

21782181
def contribute_socket(self, sock_info):
21792182
"""Provide socket information to the error handler."""
2180-
# Currently, we only extract the max_wire_version information.
21812183
self._max_wire_version = sock_info.max_wire_version
2184+
self._sock_generation = sock_info.generation
21822185

21832186
def __enter__(self):
21842187
return self
@@ -2187,45 +2190,15 @@ def __exit__(self, exc_type, exc_val, exc_tb):
21872190
if exc_type is None:
21882191
return
21892192

2193+
err_ctx = _ErrorContext(
2194+
exc_val, self._max_wire_version, self._sock_generation)
2195+
self._client._topology.handle_error(self._server_address, err_ctx)
2196+
21902197
if issubclass(exc_type, PyMongoError):
21912198
if self._session and exc_val.has_error_label(
21922199
"TransientTransactionError"):
21932200
self._session._unpin_mongos()
21942201

2195-
if issubclass(exc_type, NetworkTimeout):
2196-
# The socket has been closed. Don't reset the server.
2197-
# Server Discovery And Monitoring Spec: "When an application
2198-
# operation fails because of any network error besides a socket
2199-
# timeout...."
2200-
if self._session:
2201-
self._session._server_session.mark_dirty()
2202-
elif issubclass(exc_type, NotMasterError):
2203-
# As per the SDAM spec if:
2204-
# - the server sees a "not master" error, and
2205-
# - the server is not shutting down, and
2206-
# - the server version is >= 4.2, then
2207-
# we keep the existing connection pool, but mark the server type
2208-
# as Unknown and request an immediate check of the server.
2209-
# Otherwise, we clear the connection pool, mark the server as
2210-
# Unknown and request an immediate check of the server.
2211-
err_code = exc_val.details.get('code', -1)
2212-
is_shutting_down = err_code in helpers._SHUTDOWN_CODES
2213-
if is_shutting_down or (self._max_wire_version <= 7):
2214-
# Clear the pool, mark server Unknown and request check.
2215-
self._client._reset_server_and_request_check(
2216-
self._server_address, exc_val)
2217-
else:
2218-
self._client._topology.mark_server_unknown_and_request_check(
2219-
self._server_address, exc_val)
2220-
elif issubclass(exc_type, ConnectionFailure):
2221-
# "Client MUST replace the server's description with type Unknown
2222-
# ... MUST NOT request an immediate check of the server."
2223-
self._client._topology.reset_server(self._server_address, exc_val)
2202+
if issubclass(exc_type, ConnectionFailure):
22242203
if self._session:
22252204
self._session._server_session.mark_dirty()
2226-
elif issubclass(exc_type, OperationFailure):
2227-
# Do not request an immediate check since the server is likely
2228-
# shutting down.
2229-
if exc_val.code in helpers._RETRYABLE_ERROR_CODES:
2230-
self._client._topology.reset_server(
2231-
self._server_address, exc_val)

pymongo/server_description.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ class ServerDescription(object):
3636
'_max_write_batch_size', '_min_wire_version', '_max_wire_version',
3737
'_round_trip_time', '_me', '_is_writable', '_is_readable',
3838
'_ls_timeout_minutes', '_error', '_set_version', '_election_id',
39-
'_cluster_time', '_last_write_date', '_last_update_time')
39+
'_cluster_time', '_last_write_date', '_last_update_time',
40+
'_topology_version')
4041

4142
def __init__(
4243
self,
@@ -68,6 +69,10 @@ def __init__(
6869
self._me = ismaster.me
6970
self._last_update_time = _time()
7071
self._error = error
72+
self._topology_version = ismaster.topology_version
73+
if error:
74+
if hasattr(error, 'details') and isinstance(error.details, dict):
75+
self._topology_version = error.details.get('topologyVersion')
7176

7277
if ismaster.last_write_date:
7378
# Convert from datetime to seconds.
@@ -207,6 +212,15 @@ def retryable_reads_supported(self):
207212
"""Checks if this server supports retryable writes."""
208213
return self._max_wire_version >= 6
209214

215+
@property
216+
def topology_version(self):
217+
return self._topology_version
218+
219+
def to_unknown(self):
220+
unknown = ServerDescription(self.address)
221+
unknown._topology_version = self.topology_version
222+
return unknown
223+
210224
def __eq__(self, other):
211225
if isinstance(other, ServerDescription):
212226
return ((self._address == other.address) and

pymongo/topology.py

Lines changed: 117 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,20 @@
2626
else:
2727
import Queue
2828

29-
from pymongo import common
30-
from pymongo import periodic_executor
29+
from pymongo import (common,
30+
helpers,
31+
periodic_executor)
3132
from pymongo.pool import PoolOptions
3233
from pymongo.topology_description import (updated_topology_description,
3334
_updated_topology_description_srv_polling,
3435
TopologyDescription,
3536
SRV_POLLING_TOPOLOGIES, TOPOLOGY_TYPE)
36-
from pymongo.errors import ServerSelectionTimeoutError, ConfigurationError
37+
from pymongo.errors import (ConnectionFailure,
38+
ConfigurationError,
39+
NetworkTimeout,
40+
NotMasterError,
41+
OperationFailure,
42+
ServerSelectionTimeoutError)
3743
from pymongo.monitor import SrvMonitor
3844
from pymongo.monotonic import time as _time
3945
from pymongo.server import Server
@@ -264,14 +270,17 @@ def _process_change(self, server_description):
264270
Hold the lock when calling this.
265271
"""
266272
td_old = self._description
267-
old_server_description = td_old._server_descriptions[
268-
server_description.address]
273+
sd_old = td_old._server_descriptions[server_description.address]
274+
if _is_stale_server_description(sd_old, server_description):
275+
# This is a stale isMaster response. Ignore it.
276+
return
277+
269278
suppress_event = ((self._publish_server or self._publish_tp)
270-
and old_server_description == server_description)
279+
and sd_old == server_description)
271280
if self._publish_server and not suppress_event:
272281
self._events.put((
273282
self._listeners.publish_server_description_changed,
274-
(old_server_description, server_description,
283+
(sd_old, server_description,
275284
server_description.address, self._topology_id)))
276285

277286
self._description = updated_topology_description(
@@ -410,25 +419,15 @@ def reset_pool(self, address):
410419
if server:
411420
server.pool.reset()
412421

413-
def reset_server(self, address, error):
414-
"""Clear our pool for a server and mark it Unknown.
415-
416-
Do *not* request an immediate check.
417-
"""
418-
with self._lock:
419-
self._reset_server(address, reset_pool=True, error=error)
420-
421-
def reset_server_and_request_check(self, address, error):
422+
def handle_getlasterror(self, address, error_msg):
422423
"""Clear our pool for a server, mark it Unknown, and check it soon."""
424+
error = NotMasterError(error_msg, {'code': 10107, 'errmsg': error_msg})
423425
with self._lock:
424-
self._reset_server(address, reset_pool=True, error=error)
425-
self._request_check(address)
426-
427-
def mark_server_unknown_and_request_check(self, address, error):
428-
"""Mark a server Unknown, and check it soon."""
429-
with self._lock:
430-
self._reset_server(address, reset_pool=False, error=error)
431-
self._request_check(address)
426+
server = self._servers.get(address)
427+
if server:
428+
self._process_change(ServerDescription(address, error=error))
429+
server.pool.reset()
430+
server.request_check()
432431

433432
def update_pool(self, all_credentials):
434433
# Remove any stale sockets and add new sockets if pool is too small.
@@ -540,28 +539,78 @@ def _ensure_opened(self):
540539
for server in itervalues(self._servers):
541540
server.open()
542541

543-
def _reset_server(self, address, reset_pool, error):
544-
"""Mark a server Unknown and optionally reset it's pool.
545-
546-
Hold the lock when calling this. Does *not* request an immediate check.
547-
"""
542+
def _is_stale_error(self, address, err_ctx):
548543
server = self._servers.get(address)
549-
550-
# "server" is None if another thread removed it from the topology.
551-
if server:
552-
if reset_pool:
544+
if server is None:
545+
# Another thread removed this server from the topology.
546+
return True
547+
548+
if err_ctx.sock_generation != server._pool.generation:
549+
# This is an outdated error from a previous pool version.
550+
return True
551+
552+
# topologyVersion check, ignore error when cur_tv >= error_tv:
553+
cur_tv = server.description.topology_version
554+
error = err_ctx.error
555+
error_tv = None
556+
if error and hasattr(error, 'details'):
557+
if isinstance(error.details, dict):
558+
error_tv = error.details.get('topologyVersion')
559+
560+
return _is_stale_error_topology_version(cur_tv, error_tv)
561+
562+
def _handle_error(self, address, err_ctx):
563+
if self._is_stale_error(address, err_ctx):
564+
return
565+
566+
server = self._servers[address]
567+
error = err_ctx.error
568+
exc_type = type(error)
569+
if issubclass(exc_type, NetworkTimeout):
570+
# The socket has been closed. Don't reset the server.
571+
# Server Discovery And Monitoring Spec: "When an application
572+
# operation fails because of any network error besides a socket
573+
# timeout...."
574+
return
575+
elif issubclass(exc_type, NotMasterError):
576+
# As per the SDAM spec if:
577+
# - the server sees a "not master" error, and
578+
# - the server is not shutting down, and
579+
# - the server version is >= 4.2, then
580+
# we keep the existing connection pool, but mark the server type
581+
# as Unknown and request an immediate check of the server.
582+
# Otherwise, we clear the connection pool, mark the server as
583+
# Unknown and request an immediate check of the server.
584+
err_code = error.details.get('code', -1)
585+
is_shutting_down = err_code in helpers._SHUTDOWN_CODES
586+
# Mark server Unknown, clear the pool, and request check.
587+
self._process_change(ServerDescription(address, error=error))
588+
if is_shutting_down or (err_ctx.max_wire_version <= 7):
589+
# Clear the pool.
553590
server.reset()
554-
555-
# Mark this server Unknown.
591+
server.request_check()
592+
elif issubclass(exc_type, ConnectionFailure):
593+
# "Client MUST replace the server's description with type Unknown
594+
# ... MUST NOT request an immediate check of the server."
556595
self._process_change(ServerDescription(address, error=error))
596+
# Clear the pool.
597+
server.reset()
598+
elif issubclass(exc_type, OperationFailure):
599+
# Do not request an immediate check since the server is likely
600+
# shutting down.
601+
if error.code in helpers._NOT_MASTER_CODES:
602+
self._process_change(ServerDescription(address, error=error))
603+
# Clear the pool.
604+
server.reset()
557605

558-
def _request_check(self, address):
559-
"""Wake one monitor. Hold the lock when calling this."""
560-
server = self._servers.get(address)
606+
def handle_error(self, address, err_ctx):
607+
"""Handle an application error.
561608
562-
# "server" is None if another thread removed it from the topology.
563-
if server:
564-
server.request_check()
609+
May reset the server to Unknown, clear the pool, and request an
610+
immediate check depending on the error and the context.
611+
"""
612+
with self._lock:
613+
self._handle_error(address, err_ctx)
565614

566615
def _request_check_all(self):
567616
"""Wake all monitors. Hold the lock when calling this."""
@@ -692,3 +741,30 @@ def __repr__(self):
692741
if not self._opened:
693742
msg = 'CLOSED '
694743
return '<%s %s%r>' % (self.__class__.__name__, msg, self._description)
744+
745+
746+
class _ErrorContext(object):
747+
"""An error with context for SDAM error handling."""
748+
def __init__(self, error, max_wire_version, sock_generation):
749+
self.error = error
750+
self.max_wire_version = max_wire_version
751+
self.sock_generation = sock_generation
752+
753+
754+
def _is_stale_error_topology_version(current_tv, error_tv):
755+
"""Return True if the error's topologyVersion is <= current."""
756+
if current_tv is None or error_tv is None:
757+
return False
758+
if current_tv['processId'] != error_tv['processId']:
759+
return False
760+
return current_tv['counter'] >= error_tv['counter']
761+
762+
763+
def _is_stale_server_description(current_sd, new_sd):
764+
"""Return True if the new topologyVersion is < current."""
765+
current_tv, new_tv = current_sd.topology_version, new_sd.topology_version
766+
if current_tv is None or new_tv is None:
767+
return False
768+
if current_tv['processId'] != new_tv['processId']:
769+
return False
770+
return current_tv['counter'] > new_tv['counter']

0 commit comments

Comments
 (0)