2121from ..collections import EMPTY_DICT
2222from ..exceptions import DCSError , PatroniException
2323from ..postgresql .mpp import AbstractMPP
24- from ..utils import deep_compare , enable_keepalive , iter_response_objects , RetryFailedError , USER_AGENT
24+ from ..utils import deep_compare , enable_keepalive , iter_response_objects , parse_bool , RetryFailedError , USER_AGENT
2525from . import catch_return_false_exception , Cluster , ClusterConfig , \
2626 Failover , Leader , Member , Status , SyncState , TimelineHistory
2727from .etcd import AbstractEtcd , AbstractEtcdClientWithFailover , catch_etcd_errors , \
@@ -66,6 +66,10 @@ class Etcd3Exception(etcd.EtcdException):
6666 pass
6767
6868
69+ class Etcd3WatchCanceled (Etcd3Exception ):
70+ pass
71+
72+
6973class Etcd3ClientError (Etcd3Exception ):
7074
7175 def __init__ (self , code : Optional [int ] = None , error : Optional [str ] = None , status : Optional [int ] = None ) -> None :
@@ -356,7 +360,6 @@ def handle_auth_errors(self: 'Etcd3Client', func: Callable[..., Any], *args: Any
356360 exc = e
357361 self ._reauthenticate = True
358362 if retry :
359- logger .error ('retry = %s' , retry )
360363 retry .ensure_deadline (0.5 , exc )
361364 elif reauthenticated :
362365 raise exc
@@ -508,6 +511,8 @@ def _process_message(self, message: Dict[str, Any]) -> None:
508511 if 'error' in message :
509512 raise _raise_for_data (message )
510513 result = message .get ('result' , EMPTY_DICT )
514+ if parse_bool (result .get ('canceled' )):
515+ raise Etcd3WatchCanceled ('Watch canceled' )
511516 header = result .get ('header' , EMPTY_DICT )
512517 self ._check_cluster_raft_term (header .get ('cluster_id' ), header .get ('raft_term' ))
513518 events : List [Dict [str , Any ]] = result .get ('events' , [])
@@ -555,8 +560,10 @@ def _build_cache(self) -> None:
555560
556561 try :
557562 self ._do_watch (result ['header' ]['revision' ])
563+ except Etcd3WatchCanceled :
564+ logger .info ('Watch request canceled' )
558565 except Exception as e :
559- # Following exceptions are expected on Windows because the /watch request is done with `read_timeout`
566+ # Following exceptions are expected on Windows because the /watch request is done with `read_timeout`
560567 if not (os .name == 'nt' and isinstance (e , (ReadTimeoutError , ProtocolError ))):
561568 logger .error ('watchprefix failed: %r' , e )
562569 finally :
@@ -576,16 +583,21 @@ def run(self) -> None:
576583 time .sleep (1 )
577584
578585 def kill_stream (self ) -> None :
579- sock = None
586+ conn_sock : Any = None
580587 with self ._response_lock :
581588 if isinstance (self ._response , urllib3 .response .HTTPResponse ):
582589 try :
583- sock = self ._response .connection .sock if self ._response .connection else None
590+ conn_sock = self ._response .connection .sock if self ._response .connection else None
584591 except Exception :
585- sock = None
592+ conn_sock = None
586593 else :
587594 self ._response = False
588- if sock :
595+ if conn_sock :
596+ # python-etcd forces usage of pyopenssl if the last one is available.
597+ # In this case HTTPConnection.socket is not inherited from socket.socket, but urllib3 uses custom
598+ # class `WrappedSocket`, which shutdown() method could be incompatible with socket.shutdown().
599+ # Therefore we use WrappedSocket.socket, which points to original `socket` object.
600+ sock : socket .socket = conn_sock .socket if conn_sock .__class__ .__name__ == 'WrappedSocket' else conn_sock
589601 try :
590602 sock .shutdown (socket .SHUT_RDWR )
591603 sock .close ()
0 commit comments