Skip to content

Commit a79a69a

Browse files
authored
Make sure Etcd3 KVCache is not stale when doing get_cluster() (patroni#3373)
We do it in a similar same way as in patroni#3318 and in addition to that we cross check raft_term between `KVCache` and the client connection in the `is_ready()` method. In case of mismatch we reset `KVCache` state by terminating watcher connection. Close patroni#3359
1 parent 2823adf commit a79a69a

File tree

3 files changed

+61
-27
lines changed

3 files changed

+61
-27
lines changed

patroni/dcs/etcd.py

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -101,40 +101,28 @@ def _do_resolve(host: str, port: int) -> List[_AddrInfo]:
101101
return []
102102

103103

104-
class AbstractEtcdClientWithFailover(abc.ABC, etcd.Client):
104+
class StaleEtcdNodeGuard(object):
105105

106-
ERROR_CLS: Type[Exception]
106+
def __init__(self) -> None:
107+
self._reset_cluster_raft_term()
107108

108-
def __init__(self, config: Dict[str, Any], dns_resolver: DnsCachingResolver, cache_ttl: int = 300) -> None:
109+
def _reset_cluster_raft_term(self) -> None:
109110
self._cluster_id = None
110111
self._raft_term = 0
111-
self._dns_resolver = dns_resolver
112-
self.set_machines_cache_ttl(cache_ttl)
113-
self._machines_cache_updated = 0
114-
kwargs = {p: config.get(p) for p in ('host', 'port', 'protocol', 'use_proxies', 'version_prefix',
115-
'username', 'password', 'cert', 'ca_cert') if config.get(p)}
116-
super(AbstractEtcdClientWithFailover, self).__init__(read_timeout=config['retry_timeout'], **kwargs)
117-
# For some reason python3-etcd on debian and ubuntu are not based on the latest version
118-
# Workaround for the case when https://github.com/jplana/python-etcd/pull/196 is not applied
119-
self.http.connection_pool_kw.pop('ssl_version', None)
120-
self._config = config
121-
self._load_machines_cache()
122-
self._allow_reconnect = True
123-
# allow passing retry argument to api_execute in params
124-
self._comparison_conditions.add('retry')
125-
self._read_options.add('retry')
126-
self._del_conditions.add('retry')
127112

128113
def _check_cluster_raft_term(self, cluster_id: Optional[str], value: Union[None, str, int]) -> None:
129114
"""Check that observed Raft Term in Etcd cluster is increasing.
130115
131-
If we observe that the new value is smaller than the previously known one, it could be an
132-
indicator that we connected to a stale node and should switch to some other node.
133-
However, we need to reset the memorized value when we notice that Cluster ID changed.
116+
:param cluster_id: last observed Etcd Cluster ID
117+
:param raft_term: last observed Raft Term
118+
119+
:raises:
120+
:exc::`StaleEtcdNode` if last observed *raft_term* is smaller than previously known *raft_term*.
134121
"""
135122
if not (cluster_id and value):
136123
return
137124

125+
# We need to reset the memorized value when we notice that Cluster ID changed.
138126
if self._cluster_id and self._cluster_id != cluster_id:
139127
logger.warning('Etcd Cluster ID changed from %s to %s', self._cluster_id, cluster_id)
140128
self._raft_term = 0
@@ -151,6 +139,30 @@ def _check_cluster_raft_term(self, cluster_id: Optional[str], value: Union[None,
151139
raise StaleEtcdNode
152140
self._raft_term = raft_term
153141

142+
143+
class AbstractEtcdClientWithFailover(abc.ABC, etcd.Client, StaleEtcdNodeGuard):
144+
145+
ERROR_CLS: Type[Exception]
146+
147+
def __init__(self, config: Dict[str, Any], dns_resolver: DnsCachingResolver, cache_ttl: int = 300) -> None:
148+
StaleEtcdNodeGuard.__init__(self)
149+
self._dns_resolver = dns_resolver
150+
self.set_machines_cache_ttl(cache_ttl)
151+
self._machines_cache_updated = 0
152+
kwargs = {p: config.get(p) for p in ('host', 'port', 'protocol', 'use_proxies', 'version_prefix',
153+
'username', 'password', 'cert', 'ca_cert') if config.get(p)}
154+
super(AbstractEtcdClientWithFailover, self).__init__(read_timeout=config['retry_timeout'], **kwargs)
155+
# For some reason python3-etcd on debian and ubuntu are not based on the latest version
156+
# Workaround for the case when https://github.com/jplana/python-etcd/pull/196 is not applied
157+
self.http.connection_pool_kw.pop('ssl_version', None)
158+
self._config = config
159+
self._load_machines_cache()
160+
self._allow_reconnect = True
161+
# allow passing retry argument to api_execute in params
162+
self._comparison_conditions.add('retry')
163+
self._read_options.add('retry')
164+
self._del_conditions.add('retry')
165+
154166
def _calculate_timeouts(self, etcd_nodes: int, timeout: Optional[float] = None) -> Tuple[int, float, int]:
155167
"""Calculate a request timeout and number of retries per single etcd node.
156168
In case if the timeout per node is too small (less than one second) we will reduce the number of nodes.

patroni/dcs/etcd3.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
from ..utils import deep_compare, enable_keepalive, iter_response_objects, RetryFailedError, USER_AGENT
2525
from . import catch_return_false_exception, Cluster, ClusterConfig, \
2626
Failover, Leader, Member, Status, SyncState, TimelineHistory
27-
from .etcd import AbstractEtcd, AbstractEtcdClientWithFailover, catch_etcd_errors, DnsCachingResolver, Retry
27+
from .etcd import AbstractEtcd, AbstractEtcdClientWithFailover, catch_etcd_errors, \
28+
DnsCachingResolver, Retry, StaleEtcdNode, StaleEtcdNodeGuard
2829

2930
logger = logging.getLogger(__name__)
3031

@@ -432,10 +433,11 @@ def watchprefix(self, key: str, start_revision: Optional[str] = None,
432433
return self.watchrange(key, prefix_range_end(key), start_revision, filters, read_timeout)
433434

434435

435-
class KVCache(Thread):
436+
class KVCache(StaleEtcdNodeGuard, Thread):
436437

437438
def __init__(self, dcs: 'Etcd3', client: 'PatroniEtcd3Client') -> None:
438-
super(KVCache, self).__init__()
439+
Thread.__init__(self)
440+
StaleEtcdNodeGuard.__init__(self)
439441
self.daemon = True
440442
self._dcs = dcs
441443
self._client = client
@@ -505,7 +507,10 @@ def _process_message(self, message: Dict[str, Any]) -> None:
505507
logger.debug('Received message: %s', message)
506508
if 'error' in message:
507509
raise _raise_for_data(message)
508-
events: List[Dict[str, Any]] = message.get('result', {}).get('events', [])
510+
result = message.get('result', EMPTY_DICT)
511+
header = result.get('header', EMPTY_DICT)
512+
self._check_cluster_raft_term(header.get('cluster_id'), header.get('raft_term'))
513+
events: List[Dict[str, Any]] = result.get('events', [])
509514
for event in events:
510515
self._process_event(event)
511516

@@ -539,8 +544,11 @@ def _do_watch(self, revision: str) -> None:
539544

540545
def _build_cache(self) -> None:
541546
result = self._dcs.retry(self._client.prefix, self._dcs.cluster_prefix)
547+
header = result.get('header', EMPTY_DICT)
542548
with self._object_cache_lock:
549+
self._reset_cluster_raft_term()
543550
self._object_cache = {node['key']: node for node in result.get('kvs', [])}
551+
self._check_cluster_raft_term(header.get('cluster_id'), header.get('raft_term'))
544552
with self.condition:
545553
self._is_ready = True
546554
self.condition.notify()
@@ -586,6 +594,12 @@ def kill_stream(self) -> None:
586594

587595
def is_ready(self) -> bool:
588596
"""Must be called only when holding the lock on `condition`"""
597+
if self._is_ready:
598+
try:
599+
self._client._check_cluster_raft_term(self._cluster_id, self._raft_term)
600+
except StaleEtcdNode:
601+
self._is_ready = False
602+
self.kill_stream()
589603
return self._is_ready
590604

591605

tests/test_etcd3.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def mock_urlopen(self, method, url, **kwargs):
5151
elif url.endswith('/watch'):
5252
key = base64_encode('/patroni/test/config')
5353
ret.read_chunked = Mock(return_value=[json.dumps({
54-
'result': {'events': [
54+
'result': {'header': {'cluster_id': '1', 'raft_term': 1}, 'events': [
5555
{'kv': {'key': key, 'value': base64_encode('bar'), 'mod_revision': '2'}},
5656
{'kv': {'key': key, 'value': base64_encode('buzz'), 'mod_revision': '3'}},
5757
{'type': 'DELETE', 'kv': {'key': key, 'mod_revision': '4'}},
@@ -120,6 +120,14 @@ def test_kill_stream(self):
120120
type(mock_conn).sock = PropertyMock(side_effect=Exception)
121121
self.kv_cache.kill_stream()
122122

123+
@patch.object(urllib3.PoolManager, 'urlopen', mock_urlopen)
124+
def test_is_ready(self):
125+
self.kv_cache._build_cache()
126+
with self.kv_cache.condition:
127+
self.kv_cache._is_ready = True
128+
self.client._raft_term = 2
129+
self.assertFalse(self.kv_cache.is_ready())
130+
123131

124132
class TestPatroniEtcd3Client(BaseTestEtcd3):
125133

0 commit comments

Comments
 (0)