Skip to content

Commit f85a9f9

Browse files
committed
PYTHON-1675 SRV polling for mongos discovery
1 parent afbf18b commit f85a9f9

16 files changed

+545
-107
lines changed

doc/changelog.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ Version 3.9 adds support for MongoDB 4.2. Highlights include:
3939
enabled by default. See the :class:`~pymongo.mongo_client.MongoClient`
4040
documentation for details.
4141
- Support zstandard for wire protocol compression.
42+
- Support for periodically polling DNS SRV records to update the mongos proxy
43+
list without having to change client configuration.
4244

4345
Now that supported operations are retried automatically and transparently,
4446
users should consider adjusting any custom retry logic to prevent

pymongo/common.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@
7373
# Spec requires at least 500ms between ismaster calls.
7474
MIN_HEARTBEAT_INTERVAL = 0.5
7575

76+
# Spec requires at least 60s between SRV rescans.
77+
MIN_SRV_RESCAN_INTERVAL = 60
78+
7679
# Default connectTimeout in seconds.
7780
CONNECT_TIMEOUT = 20.0
7881

pymongo/mongo_client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,7 @@ def __init__(
585585
password = None
586586
dbase = None
587587
opts = {}
588+
fqdn = None
588589
for entity in host:
589590
if "://" in entity:
590591
res = uri_parser.parse_uri(
@@ -594,6 +595,7 @@ def __init__(
594595
password = res["password"] or password
595596
dbase = res["database"] or dbase
596597
opts = res["options"]
598+
fqdn = res["fqdn"]
597599
else:
598600
seeds.update(uri_parser.split_hosts(entity, port))
599601
if not seeds:
@@ -673,7 +675,8 @@ def __init__(
673675
local_threshold_ms=options.local_threshold_ms,
674676
server_selection_timeout=options.server_selection_timeout,
675677
server_selector=options.server_selector,
676-
heartbeat_frequency=options.heartbeat_frequency)
678+
heartbeat_frequency=options.heartbeat_frequency,
679+
fqdn=fqdn)
677680

678681
self._topology = Topology(self._topology_settings)
679682
if connect:

pymongo/monitor.py

Lines changed: 95 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,42 @@
1818

1919
from pymongo import common, periodic_executor
2020
from pymongo.errors import OperationFailure
21-
from pymongo.server_type import SERVER_TYPE
2221
from pymongo.monotonic import time as _time
2322
from pymongo.read_preferences import MovingAverage
2423
from pymongo.server_description import ServerDescription
24+
from pymongo.server_type import SERVER_TYPE
25+
from pymongo.srv_resolver import _SrvResolver
26+
27+
28+
class MonitorBase(object):
29+
def __init__(self, *args, **kwargs):
30+
"""Override this method to create an executor."""
31+
raise NotImplementedError
32+
33+
def open(self):
34+
"""Start monitoring, or restart after a fork.
35+
36+
Multiple calls have no effect.
37+
"""
38+
self._executor.open()
39+
40+
def close(self):
41+
"""Close and stop monitoring.
42+
43+
open() restarts the monitor after closing.
44+
"""
45+
self._executor.close()
46+
47+
def join(self, timeout=None):
48+
"""Wait for the monitor to stop."""
49+
self._executor.join(timeout)
50+
51+
def request_check(self):
52+
"""If the monitor is sleeping, wake it soon."""
53+
self._executor.wake()
2554

2655

27-
class Monitor(object):
56+
class Monitor(MonitorBase):
2857
def __init__(
2958
self,
3059
server_description,
@@ -68,31 +97,13 @@ def target():
6897
self_ref = weakref.ref(self, executor.close)
6998
self._topology = weakref.proxy(topology, executor.close)
7099

71-
def open(self):
72-
"""Start monitoring, or restart after a fork.
73-
74-
Multiple calls have no effect.
75-
"""
76-
self._executor.open()
77-
78100
def close(self):
79-
"""Close and stop monitoring.
80-
81-
open() restarts the monitor after closing.
82-
"""
83-
self._executor.close()
101+
super(Monitor, self).close()
84102

85103
# Increment the pool_id and maybe close the socket. If the executor
86104
# thread has the socket checked out, it will be closed when checked in.
87105
self._pool.reset()
88106

89-
def join(self, timeout=None):
90-
self._executor.join(timeout)
91-
92-
def request_check(self):
93-
"""If the monitor is sleeping, wake and check the server soon."""
94-
self._executor.wake()
95-
96107
def _run(self):
97108
try:
98109
self._server_description = self._check_with_retry()
@@ -182,3 +193,66 @@ def _check_with_socket(self, sock_info):
182193
self._topology.receive_cluster_time(
183194
exc.details.get('$clusterTime'))
184195
raise
196+
197+
198+
class SrvMonitor(MonitorBase):
199+
def __init__(self, topology, topology_settings):
200+
"""Class to poll SRV records on a background thread.
201+
202+
Pass a Topology and a TopologySettings.
203+
204+
The Topology is weakly referenced.
205+
"""
206+
self._settings = topology_settings
207+
self._fqdn = self._settings.fqdn
208+
209+
# We strongly reference the executor and it weakly references us via
210+
# this closure. When the monitor is freed, stop the executor soon.
211+
def target():
212+
monitor = self_ref()
213+
if monitor is None:
214+
return False # Stop the executor.
215+
SrvMonitor._run(monitor)
216+
return True
217+
218+
executor = periodic_executor.PeriodicExecutor(
219+
interval=common.MIN_SRV_RESCAN_INTERVAL,
220+
min_interval=self._settings.heartbeat_frequency,
221+
target=target,
222+
name="pymongo_srv_polling_thread")
223+
224+
self._executor = executor
225+
226+
# Avoid cycles. When self or topology is freed, stop executor soon.
227+
self_ref = weakref.ref(self, executor.close)
228+
self._topology = weakref.proxy(topology, executor.close)
229+
230+
def _run(self):
231+
try:
232+
self._seedlist = self._get_seedlist()
233+
self._topology.on_srv_update(self._seedlist)
234+
except ReferenceError:
235+
# Topology was garbage-collected.
236+
self.close()
237+
238+
def _get_seedlist(self):
239+
"""Poll SRV records for a seedlist.
240+
241+
Returns a list of ServerDescriptions.
242+
"""
243+
try:
244+
seedlist, ttl = _SrvResolver(self._fqdn).get_hosts_and_min_ttl()
245+
if len(seedlist) == 0:
246+
# As per the spec: this should be treated as a failure.
247+
raise Exception
248+
except Exception:
249+
# As per the spec, upon encountering an error:
250+
# - An error must not be raised
251+
# - SRV records must be rescanned every heartbeatFrequencyMS
252+
# - Topology must be left unchanged
253+
self.request_check()
254+
return self._seedlist
255+
else:
256+
self._executor.update_interval(
257+
max(ttl, common.MIN_SRV_RESCAN_INTERVAL))
258+
return seedlist

pymongo/periodic_executor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ def wake(self):
102102
"""Execute the target function soon."""
103103
self._event = True
104104

105+
def update_interval(self, new_interval):
106+
self._interval = new_interval
107+
105108
def __should_stop(self):
106109
with self._lock:
107110
if self._stopped:

pymongo/settings.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
from pymongo import common, monitor, pool
2121
from pymongo.common import LOCAL_THRESHOLD_MS, SERVER_SELECTION_TIMEOUT
2222
from pymongo.errors import ConfigurationError
23-
from pymongo.topology_description import TOPOLOGY_TYPE
2423
from pymongo.pool import PoolOptions
2524
from pymongo.server_description import ServerDescription
25+
from pymongo.topology_description import TOPOLOGY_TYPE
2626

2727

2828
class TopologySettings(object):
@@ -36,7 +36,8 @@ def __init__(self,
3636
local_threshold_ms=LOCAL_THRESHOLD_MS,
3737
server_selection_timeout=SERVER_SELECTION_TIMEOUT,
3838
heartbeat_frequency=common.HEARTBEAT_FREQUENCY,
39-
server_selector=None):
39+
server_selector=None,
40+
fqdn=None):
4041
"""Represent MongoClient's configuration.
4142
4243
Take a list of (host, port) pairs and optional replica set name.
@@ -55,6 +56,7 @@ def __init__(self,
5556
self._local_threshold_ms = local_threshold_ms
5657
self._server_selection_timeout = server_selection_timeout
5758
self._server_selector = server_selector
59+
self._fqdn = fqdn
5860
self._heartbeat_frequency = heartbeat_frequency
5961
self._direct = (len(self._seeds) == 1 and not replica_set_name)
6062
self._topology_id = ObjectId()
@@ -100,6 +102,10 @@ def server_selector(self):
100102
def heartbeat_frequency(self):
101103
return self._heartbeat_frequency
102104

105+
@property
106+
def fqdn(self):
107+
return self._fqdn
108+
103109
@property
104110
def direct(self):
105111
"""Connect directly to a single server, or use a set of servers?

pymongo/srv_resolver.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Copyright 2019-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you
4+
# may not use this file except in compliance with the License. You
5+
# may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
# implied. See the License for the specific language governing
13+
# permissions and limitations under the License.
14+
15+
"""Support for resolving hosts and options from mongodb+srv:// URIs."""
16+
17+
try:
18+
from dns import resolver
19+
_HAVE_DNSPYTHON = True
20+
except ImportError:
21+
_HAVE_DNSPYTHON = False
22+
23+
from bson.py3compat import PY3
24+
25+
from pymongo.errors import ConfigurationError
26+
27+
28+
if PY3:
29+
# dnspython can return bytes or str from various parts
30+
# of its API depending on version. We always want str.
31+
def maybe_decode(text):
32+
if isinstance(text, bytes):
33+
return text.decode()
34+
return text
35+
else:
36+
def maybe_decode(text):
37+
return text
38+
39+
40+
class _SrvResolver(object):
41+
def __init__(self, fqdn):
42+
self.__fqdn = fqdn
43+
44+
# Validate the fully qualified domain name.
45+
try:
46+
self.__plist = self.__fqdn.split(".")[1:]
47+
except Exception:
48+
raise ConfigurationError("Invalid URI host")
49+
self.__slen = len(self.__plist)
50+
if self.__slen < 2:
51+
raise ConfigurationError("Invalid URI host")
52+
53+
def get_options(self):
54+
try:
55+
results = resolver.query(self.__fqdn, 'TXT')
56+
except (resolver.NoAnswer, resolver.NXDOMAIN):
57+
# No TXT records
58+
return None
59+
except Exception as exc:
60+
raise ConfigurationError(str(exc))
61+
if len(results) > 1:
62+
raise ConfigurationError('Only one TXT record is supported')
63+
return (
64+
b'&'.join([b''.join(res.strings) for res in results])).decode(
65+
'utf-8')
66+
67+
def _resolve_uri(self, encapsulate_errors):
68+
try:
69+
results = resolver.query('_mongodb._tcp.' + self.__fqdn, 'SRV')
70+
except Exception as exc:
71+
if not encapsulate_errors:
72+
# Raise the original error.
73+
raise
74+
# Else, raise all errors as ConfigurationError.
75+
raise ConfigurationError(str(exc))
76+
return results
77+
78+
def _get_srv_response_and_hosts(self, encapsulate_errors):
79+
results = self._resolve_uri(encapsulate_errors)
80+
81+
# Construct address tuples
82+
nodes = [
83+
(maybe_decode(res.target.to_text(omit_final_dot=True)), res.port)
84+
for res in results]
85+
86+
# Validate hosts
87+
for node in nodes:
88+
try:
89+
nlist = node[0].split(".")[1:][-self.__slen:]
90+
except Exception:
91+
raise ConfigurationError("Invalid SRV host")
92+
if self.__plist != nlist:
93+
raise ConfigurationError("Invalid SRV host")
94+
95+
return results, nodes
96+
97+
def get_hosts(self):
98+
_, nodes = self._get_srv_response_and_hosts(True)
99+
return nodes
100+
101+
def get_hosts_and_min_ttl(self):
102+
results, nodes = self._get_srv_response_and_hosts(False)
103+
return nodes, results.rrset.ttl

0 commit comments

Comments
 (0)