Skip to content

Commit bfa5aaf

Browse files
authored
PYTHON-2832 Provide options to limit number of mongos servers used in connecting to sharded clusters (#754)
1 parent eabd223 commit bfa5aaf

31 files changed

+501
-59
lines changed

doc/changelog.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,11 @@ Notable improvements
179179

180180
- Enhanced connection pooling to create connections more efficiently and
181181
avoid connection storms.
182+
- :class:`~pymongo.mongo_client.MongoClient` now accepts a URI and keyword
183+
argument `srvMaxHosts` that limits the number of mongos-like hosts a client
184+
will connect to. More specifically, when a mongodb+srv:// connection string
185+
resolves to more than `srvMaxHosts` number of hosts, the client will randomly
186+
choose a `srvMaxHosts` sized subset of hosts.
182187

183188
Issues Resolved
184189
...............

pymongo/common.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,8 @@ def validate_tzinfo(dummy, value):
629629
'w': validate_non_negative_int_or_basestring,
630630
'wtimeoutms': validate_non_negative_integer,
631631
'zlibcompressionlevel': validate_zlib_compression_level,
632-
'srvservicename': validate_string
632+
'srvservicename': validate_string,
633+
'srvmaxhosts': validate_non_negative_integer
633634
}
634635

635636
# Dictionary where keys are the names of URI options specific to pymongo,

pymongo/mongo_client.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -652,8 +652,8 @@ def __init__(
652652
dbase = None
653653
opts = common._CaseInsensitiveDictionary()
654654
fqdn = None
655-
srv_service_name = keyword_opts.get("srvservicename", None)
656-
655+
srv_service_name = keyword_opts.get("srvservicename")
656+
srv_max_hosts = keyword_opts.get("srvmaxhosts")
657657
if len([h for h in host if "/" in h]) > 1:
658658
raise ConfigurationError("host must not contain multiple MongoDB "
659659
"URIs")
@@ -669,7 +669,9 @@ def __init__(
669669
keyword_opts.cased_key("connecttimeoutms"), timeout)
670670
res = uri_parser.parse_uri(
671671
entity, port, validate=True, warn=True, normalize=False,
672-
connect_timeout=timeout, srv_service_name=srv_service_name)
672+
connect_timeout=timeout,
673+
srv_service_name=srv_service_name,
674+
srv_max_hosts=srv_max_hosts)
673675
seeds.update(res["nodelist"])
674676
username = res["username"] or username
675677
password = res["password"] or password
@@ -703,6 +705,7 @@ def __init__(
703705
if srv_service_name is None:
704706
srv_service_name = opts.get("srvServiceName", common.SRV_SERVICE_NAME)
705707

708+
srv_max_hosts = srv_max_hosts or opts.get("srvmaxhosts")
706709
# Handle security-option conflicts in combined options.
707710
opts = _handle_security_options(opts)
708711
# Normalize combined options.
@@ -745,6 +748,7 @@ def __init__(
745748
srv_service_name=srv_service_name,
746749
direct_connection=options.direct_connection,
747750
load_balanced=options.load_balanced,
751+
srv_max_hosts=srv_max_hosts
748752
)
749753

750754
self._topology = Topology(self._topology_settings)

pymongo/settings.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ def __init__(self,
4141
fqdn=None,
4242
srv_service_name=common.SRV_SERVICE_NAME,
4343
direct_connection=False,
44-
load_balanced=None):
44+
load_balanced=None,
45+
srv_max_hosts=0):
4546
"""Represent MongoClient's configuration.
4647
4748
Take a list of (host, port) pairs and optional replica set name.
@@ -63,7 +64,7 @@ def __init__(self,
6364
self._fqdn = fqdn
6465
self._srv_service_name = srv_service_name
6566
self._heartbeat_frequency = heartbeat_frequency
66-
67+
self._srv_max_hosts = srv_max_hosts or 0
6768
self._direct = direct_connection
6869
self._load_balanced = load_balanced
6970

pymongo/srv_resolver.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Support for resolving hosts and options from mongodb+srv:// URIs."""
1616

1717
import ipaddress
18+
import random
1819

1920
try:
2021
from dns import resolver
@@ -25,7 +26,6 @@
2526
from pymongo.common import CONNECT_TIMEOUT
2627
from pymongo.errors import ConfigurationError
2728

28-
2929
# dnspython can return bytes or str from various parts
3030
# of its API depending on version. We always want str.
3131
def maybe_decode(text):
@@ -48,11 +48,11 @@ def _resolve(*args, **kwargs):
4848

4949
class _SrvResolver(object):
5050
def __init__(self, fqdn,
51-
connect_timeout, srv_service_name):
51+
connect_timeout, srv_service_name, srv_max_hosts=0):
5252
self.__fqdn = fqdn
5353
self.__srv = srv_service_name
5454
self.__connect_timeout = connect_timeout or CONNECT_TIMEOUT
55-
55+
self.__srv_max_hosts = srv_max_hosts or 0
5656
# Validate the fully qualified domain name.
5757
try:
5858
ipaddress.ip_address(fqdn)
@@ -111,7 +111,8 @@ def _get_srv_response_and_hosts(self, encapsulate_errors):
111111
raise ConfigurationError("Invalid SRV host: %s" % (node[0],))
112112
if self.__plist != nlist:
113113
raise ConfigurationError("Invalid SRV host: %s" % (node[0],))
114-
114+
if self.__srv_max_hosts:
115+
nodes = random.sample(nodes, min(self.__srv_max_hosts, len(nodes)))
115116
return results, nodes
116117

117118
def get_hosts(self):

pymongo/topology_description.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Represent a deployment of MongoDB servers."""
1616

1717
from collections import namedtuple
18+
from random import sample
1819

1920
from pymongo import common
2021
from pymongo.errors import ConfigurationError
@@ -223,6 +224,10 @@ def common_wire_version(self):
223224
def heartbeat_frequency(self):
224225
return self._topology_settings.heartbeat_frequency
225226

227+
@property
228+
def srv_max_hosts(self):
229+
return self._topology_settings._srv_max_hosts
230+
226231
def apply_selector(self, selector, address, custom_selector=None):
227232

228233
def apply_local_threshold(selection):
@@ -446,16 +451,23 @@ def _updated_topology_description_srv_polling(topology_description, seedlist):
446451
if set(sds.keys()) == set(seedlist):
447452
return topology_description
448453

449-
# Add SDs corresponding to servers recently added to the SRV record.
450-
for address in seedlist:
451-
if address not in sds:
452-
sds[address] = ServerDescription(address)
453454

454455
# Remove SDs corresponding to servers no longer part of the SRV record.
455456
for address in list(sds.keys()):
456457
if address not in seedlist:
457458
sds.pop(address)
458459

460+
if topology_description.srv_max_hosts != 0:
461+
new_hosts = set(seedlist) - set(sds.keys())
462+
n_to_add = topology_description.srv_max_hosts - len(sds)
463+
if n_to_add > 0:
464+
seedlist = sample(new_hosts, min(n_to_add, len(new_hosts)))
465+
else:
466+
seedlist = []
467+
# Add SDs corresponding to servers recently added to the SRV record.
468+
for address in seedlist:
469+
if address not in sds:
470+
sds[address] = ServerDescription(address)
459471
return TopologyDescription(
460472
topology_description.topology_type,
461473
sds,

pymongo/uri_parser.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,8 @@ def _check_options(nodes, options):
393393

394394

395395
def parse_uri(uri, default_port=DEFAULT_PORT, validate=True, warn=False,
396-
normalize=True, connect_timeout=None, srv_service_name=None):
396+
normalize=True, connect_timeout=None, srv_service_name=None,
397+
srv_max_hosts=None):
397398
"""Parse and validate a MongoDB URI.
398399
399400
Returns a dict of the form::
@@ -494,10 +495,8 @@ def parse_uri(uri, default_port=DEFAULT_PORT, validate=True, warn=False,
494495

495496
if opts:
496497
options.update(split_options(opts, validate, warn, normalize))
497-
498498
if srv_service_name is None:
499499
srv_service_name = options.get("srvServiceName", SRV_SERVICE_NAME)
500-
501500
if '@' in host_part:
502501
userinfo, _, hosts = host_part.rpartition('@')
503502
user, passwd = parse_userinfo(userinfo)
@@ -510,7 +509,7 @@ def parse_uri(uri, default_port=DEFAULT_PORT, validate=True, warn=False,
510509

511510
hosts = unquote_plus(hosts)
512511
fqdn = None
513-
512+
srv_max_hosts = srv_max_hosts or options.get("srvMaxHosts")
514513
if is_srv:
515514
if options.get('directConnection'):
516515
raise ConfigurationError(
@@ -529,7 +528,8 @@ def parse_uri(uri, default_port=DEFAULT_PORT, validate=True, warn=False,
529528
# Use the connection timeout. connectTimeoutMS passed as a keyword
530529
# argument overrides the same option passed in the connection string.
531530
connect_timeout = connect_timeout or options.get("connectTimeoutMS")
532-
dns_resolver = _SrvResolver(fqdn, connect_timeout, srv_service_name)
531+
dns_resolver = _SrvResolver(fqdn, connect_timeout, srv_service_name,
532+
srv_max_hosts)
533533
nodes = dns_resolver.get_hosts()
534534
dns_options = dns_resolver.get_options()
535535
if dns_options:
@@ -542,11 +542,19 @@ def parse_uri(uri, default_port=DEFAULT_PORT, validate=True, warn=False,
542542
for opt, val in parsed_dns_options.items():
543543
if opt not in options:
544544
options[opt] = val
545+
if options.get("loadBalanced") and srv_max_hosts:
546+
raise InvalidURI(
547+
"You cannot specify loadBalanced with srvMaxHosts")
548+
if options.get("replicaSet") and srv_max_hosts:
549+
raise InvalidURI("You cannot specify replicaSet with srvMaxHosts")
545550
if "tls" not in options and "ssl" not in options:
546551
options["tls"] = True if validate else 'true'
547552
elif not is_srv and options.get("srvServiceName") is not None:
548553
raise ConfigurationError("The srvServiceName option is only allowed "
549554
"with 'mongodb+srv://' URIs")
555+
elif not is_srv and srv_max_hosts:
556+
raise ConfigurationError("The srvMaxHosts option is only allowed "
557+
"with 'mongodb+srv://' URIs")
550558
else:
551559
nodes = split_hosts(hosts, default_port=default_port)
552560

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"uri": "mongodb+srv://test20.test.build.10gen.cc/?srvMaxHosts=1",
3+
"seeds": [],
4+
"hosts": [],
5+
"error": true,
6+
"comment": "Should fail because positive integer for srvMaxHosts conflicts with loadBalanced=true (TXT)"
7+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"uri": "mongodb+srv://test3.test.build.10gen.cc/?loadBalanced=true&srvMaxHosts=1",
3+
"seeds": [],
4+
"hosts": [],
5+
"error": true,
6+
"comment": "Should fail because positive integer for srvMaxHosts conflicts with loadBalanced=true"
7+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"uri": "mongodb+srv://test20.test.build.10gen.cc/?srvMaxHosts=0",
3+
"seeds": [
4+
"localhost.test.build.10gen.cc:27017"
5+
],
6+
"hosts": [
7+
"localhost.test.build.10gen.cc:27017"
8+
],
9+
"options": {
10+
"loadBalanced": true,
11+
"srvMaxHosts": 0,
12+
"ssl": true
13+
}
14+
}

0 commit comments

Comments
 (0)