Skip to content

Commit 3a4ace2

Browse files
authored
Handle watch cancelation messages in Etcd3 (patroni#3426)
Etcd3 sends cancelation message to watch channel when for example revision was compacted, however it doesn't close connection on their side. We solve it by breaking a loop of reading chunked response and closing connection on Patroni side. Close patroni#3396
1 parent e6e70ea commit 3a4ace2

File tree

2 files changed

+15
-4
lines changed

2 files changed

+15
-4
lines changed

patroni/dcs/etcd3.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from ..collections import EMPTY_DICT
2222
from ..exceptions import DCSError, PatroniException
2323
from ..postgresql.mpp import AbstractMPP
24-
from ..utils import deep_compare, enable_keepalive, iter_response_objects, RetryFailedError, USER_AGENT
24+
from ..utils import deep_compare, enable_keepalive, iter_response_objects, parse_bool, RetryFailedError, USER_AGENT
2525
from . import catch_return_false_exception, Cluster, ClusterConfig, \
2626
Failover, Leader, Member, Status, SyncState, TimelineHistory
2727
from .etcd import AbstractEtcd, AbstractEtcdClientWithFailover, catch_etcd_errors, \
@@ -66,6 +66,10 @@ class Etcd3Exception(etcd.EtcdException):
6666
pass
6767

6868

69+
class Etcd3WatchCanceled(Etcd3Exception):
70+
pass
71+
72+
6973
class Etcd3ClientError(Etcd3Exception):
7074

7175
def __init__(self, code: Optional[int] = None, error: Optional[str] = None, status: Optional[int] = None) -> None:
@@ -356,7 +360,6 @@ def handle_auth_errors(self: 'Etcd3Client', func: Callable[..., Any], *args: Any
356360
exc = e
357361
self._reauthenticate = True
358362
if retry:
359-
logger.error('retry = %s', retry)
360363
retry.ensure_deadline(0.5, exc)
361364
elif reauthenticated:
362365
raise exc
@@ -508,6 +511,8 @@ def _process_message(self, message: Dict[str, Any]) -> None:
508511
if 'error' in message:
509512
raise _raise_for_data(message)
510513
result = message.get('result', EMPTY_DICT)
514+
if parse_bool(result.get('canceled')):
515+
raise Etcd3WatchCanceled('Watch canceled')
511516
header = result.get('header', EMPTY_DICT)
512517
self._check_cluster_raft_term(header.get('cluster_id'), header.get('raft_term'))
513518
events: List[Dict[str, Any]] = result.get('events', [])
@@ -555,8 +560,10 @@ def _build_cache(self) -> None:
555560

556561
try:
557562
self._do_watch(result['header']['revision'])
563+
except Etcd3WatchCanceled:
564+
logger.info('Watch request canceled')
558565
except Exception as e:
559-
# Following exceptions are expected on Windows because the /watch request is done with `read_timeout`
566+
# Following exceptions are expected on Windows because the /watch request is done with `read_timeout`
560567
if not (os.name == 'nt' and isinstance(e, (ReadTimeoutError, ProtocolError))):
561568
logger.error('watchprefix failed: %r', e)
562569
finally:

tests/test_etcd3.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,12 @@ class TestKVCache(BaseTestEtcd3):
9696

9797
@patch.object(urllib3.PoolManager, 'urlopen', mock_urlopen)
9898
@patch.object(Etcd3Client, 'watchprefix', Mock(return_value=urllib3.response.HTTPResponse()))
99+
@patch.object(urllib3.response.HTTPResponse, 'read_chunked',
100+
Mock(return_value=[b'{"result":{"canceled":true}}']))
99101
def test__build_cache(self):
100-
self.kv_cache._build_cache()
102+
with patch('patroni.dcs.etcd3.logger') as mock_logger:
103+
self.kv_cache._build_cache()
104+
mock_logger.info.assert_called_once_with('Watch request canceled')
101105

102106
def test__do_watch(self):
103107
self.client.watchprefix = Mock(return_value=False)

0 commit comments

Comments
 (0)