@@ -1236,6 +1236,10 @@ def _retry(*args: Any, **kwargs: Any) -> Any:
12361236 return bool (_run_and_handle_exceptions (self ._patch_or_create , self .leader_path , annotations ,
12371237 kind_resource_version , ips = ips , retry = _retry ))
12381238
1239+ @staticmethod
1240+ def _isotime () -> str :
1241+ return datetime .datetime .now (tzutc ).isoformat ()
1242+
12391243 def update_leader (self , cluster : Cluster , last_lsn : Optional [int ],
12401244 slots : Optional [Dict [str , int ]] = None , failsafe : Optional [Dict [str , str ]] = None ) -> bool :
12411245 kind = self ._kinds .get (self .leader_path )
@@ -1244,7 +1248,7 @@ def update_leader(self, cluster: Cluster, last_lsn: Optional[int],
12441248 if kind and kind_annotations .get (self ._LEADER ) != self ._name :
12451249 return False
12461250
1247- now = datetime . datetime . now ( tzutc ). isoformat ()
1251+ now = self . _isotime ()
12481252 leader_observed_record = kind_annotations or self ._leader_observed_record
12491253 annotations = {self ._LEADER : self ._name , 'ttl' : str (self ._ttl ), 'renewTime' : now ,
12501254 'acquireTime' : leader_observed_record .get ('acquireTime' ) or now ,
@@ -1262,7 +1266,7 @@ def update_leader(self, cluster: Cluster, last_lsn: Optional[int],
12621266 return self ._update_leader_with_retry (annotations , resource_version , self .__ips )
12631267
12641268 def attempt_to_acquire_leader (self ) -> bool :
1265- now = datetime . datetime . now ( tzutc ). isoformat ()
1269+ now = self . _isotime ()
12661270 annotations = {self ._LEADER : self ._name , 'ttl' : str (self ._ttl ),
12671271 'renewTime' : now , 'acquireTime' : now , 'transitions' : '0' }
12681272 if self ._leader_observed_record :
@@ -1277,18 +1281,55 @@ def attempt_to_acquire_leader(self) -> bool:
12771281 annotations ['acquireTime' ] = self ._leader_observed_record .get ('acquireTime' ) or now
12781282 annotations ['transitions' ] = str (transitions )
12791283
1284+ resource_version = self ._leader_resource_version
1285+ if resource_version :
1286+ kind = self ._kinds .get (self .leader_path )
1287+ # If leader object in cache was updated we should better use fresh resource_version
1288+ if kind and kind .metadata .resource_version != resource_version :
1289+ kind_annotations = kind and kind .metadata .annotations or EMPTY_DICT
1290+ # But, only in case if leader annotations didn't change
1291+ if all (kind_annotations .get (k ) == self ._leader_observed_record .get (k ) for k in annotations .keys ()):
1292+ resource_version = kind .metadata .resource_version
1293+
1294+ retry = self ._retry .copy ()
1295+
1296+ def _retry (* args : Any , ** kwargs : Any ) -> Any :
1297+ kwargs ['_retry' ] = retry
1298+ return retry (* args , ** kwargs )
1299+
1300+ handle_conflict = False
12801301 try :
12811302 ret = bool (self ._patch_or_create (self .leader_path , annotations ,
1282- self . _leader_resource_version , retry = self . retry , ips = self .__ips ))
1303+ resource_version , retry = _retry , ips = self .__ips ))
12831304 except k8s_client .rest .ApiException as e :
1284- if e .status == 409 and self . _leader_resource_version : # Conflict in resource_version
1305+ if e .status == 409 and resource_version : # Conflict in resource_version
12851306 # Terminate watchers, it could be a sign that K8s API is in a failed state
12861307 self ._kinds .kill_stream ()
12871308 self ._pods .kill_stream ()
1309+ handle_conflict = True
12881310 ret = False
12891311 except (RetryFailedError , K8sException ) as e :
12901312 raise KubernetesError (e )
12911313
1314+ if handle_conflict and retry .ensure_deadline (1 ):
1315+ # if we are here, that means update failed with 409
1316+ # Try to get the latest version directly from K8s API instead of relying on async cache
1317+ try :
1318+ kind = _retry (self ._api .read_namespaced_kind , self .leader_path , self ._namespace )
1319+ except (RetryFailedError , K8sException ) as e :
1320+ raise KubernetesError (e )
1321+ except Exception as e :
1322+ logger .error ('Failed to get the leader object "%s": %r' , self .leader_path , e )
1323+ else :
1324+ kind_annotations = kind and kind .metadata .annotations or EMPTY_DICT
1325+ kind_resource_version = kind and kind .metadata .resource_version
1326+
1327+ # We can get 409 because we do at least one retry, and the first update might have succeeded,
1328+ # therefore we will check if annotations on the read object match expectations.
1329+ if kind and kind_resource_version != resource_version and \
1330+ all (kind_annotations .get (k ) == v for k , v in annotations .items ()):
1331+ ret = True
1332+
12921333 if not ret :
12931334 logger .info ('Could not take out TTL lock' )
12941335 return ret
0 commit comments