33import logging
44import time
55
6- from datetime import datetime
6+ from datetime import datetime , timezone
77from threading import Event , Thread
8- from typing import Union
8+ from typing import Any , Dict , Tuple , Union
99
1010import six
1111
1212import kubernetes
1313
14- from .dcs import Cluster , Member
14+ from .dcs import AbstractDCS , Cluster , Member
1515from .dcs .kubernetes import catch_kubernetes_errors
1616from .exceptions import DCSError
1717
@@ -29,7 +29,7 @@ def start(self):
2929 def shutdown (self ):
3030 pass
3131
32- def get_active_standby_config (self ) -> Union [dict , None ]:
32+ def get_active_standby_config (self ) -> Union [Dict [ str , Any ] , None ]:
3333 """Returns currently active configuration for standby leader"""
3434 return {}
3535
@@ -52,13 +52,13 @@ def heartbeat(self):
5252 def release (self ):
5353 pass
5454
55- def status (self ):
55+ def status (self ) -> Dict [ str , Any ] :
5656 return {}
5757
5858 def should_failover (self ) -> bool :
5959 return False
6060
61- def on_shutdown (self , checkpoint_location ):
61+ def on_shutdown (self , checkpoint_location : int ):
6262 pass
6363
6464
@@ -71,7 +71,7 @@ def status(self):
7171class MultisiteController (Thread , AbstractSiteController ):
7272 is_active = True
7373
74- def __init__ (self , config , on_change = None ):
74+ def __init__ (self , config : Dict [ str , Any ], on_change : None = None ):
7575 super ().__init__ ()
7676 self .stop_requested = False
7777 self .on_change = on_change
@@ -82,10 +82,11 @@ def __init__(self, config, on_change=None):
8282 self .name = msconfig ['name' ]
8383
8484 if msconfig .get ('update_crd' ):
85- self ._state_updater = KubernetesStateManagement (msconfig .get ('update_crd' ),
86- msconfig .get ('crd_uid' ),
87- reporter = self .name , # Use pod name?
88- crd_api = msconfig .get ('crd_api' , 'acid.zalan.do/v1' ))
85+ self ._state_updater = KubernetesStateManagement (
86+ msconfig .get ('update_crd' ), # pyright: ignore [reportArgumentType]
87+ msconfig .get ('crd_uid' ), # pyright: ignore [reportArgumentType]
88+ reporter = self .name , # Use pod name?
89+ crd_api = msconfig .get ('crd_api' , 'acid.zalan.do/v1' ))
8990 else :
9091 self ._state_updater = None
9192
@@ -105,7 +106,7 @@ def __init__(self, config, on_change=None):
105106 self ._dcs_error = None
106107
107108 @staticmethod
108- def get_dcs_config (config ) :
109+ def get_dcs_config (config : Dict [ str , Any ]) -> Tuple [ Dict [ str , Any ], AbstractDCS ] :
109110 msconfig = config ['multisite' ]
110111
111112 # Multisite configuration inherits values from main configuration
@@ -166,7 +167,7 @@ def release(self):
166167 def should_failover (self ):
167168 return self ._failover_target is not None and self ._failover_target != self .name
168169
169- def on_shutdown (self , checkpoint_location ):
170+ def on_shutdown (self , checkpoint_location : int ):
170171 """ Called when shutdown for multisite failover has completed.
171172 """
172173 # TODO: check if we replicated everything to standby site
@@ -193,7 +194,7 @@ def _set_standby_config(self, other: Member):
193194 logger .info (f"Setting standby configuration to: { self ._standby_config } " )
194195 return old_conf != self ._standby_config
195196
196- def _check_transition (self , leader , note = None ):
197+ def _check_transition (self , leader : bool , note : str = '' ):
197198 if self ._has_leader != leader :
198199 logger .info ("State transition" )
199200 self ._has_leader = leader
@@ -321,7 +322,7 @@ def _observe_leader(self):
321322 # On replicas we need to know the multisite status only for rewinding.
322323 logger .warning (f"Error accessing multisite DCS: { e } " )
323324
324- def _update_history (self , cluster ):
325+ def _update_history (self , cluster : Cluster ):
325326 if cluster .history and cluster .history .lines and isinstance (cluster .history .lines [0 ], dict ):
326327 self .site_switches = cluster .history .lines [0 ].get ('switches' )
327328
@@ -380,28 +381,28 @@ def shutdown(self):
380381
381382
382383class KubernetesStateManagement :
383- def __init__ (self , crd_name , crd_uid , reporter , crd_api ):
384+ def __init__ (self , crd_name : str , crd_uid : str , reporter : str , crd_api : str ):
384385 self .crd_namespace , self .crd_name = (['default' ] + crd_name .rsplit ('.' , 1 ))[- 2 :]
385386 self .crd_uid = crd_uid
386387 self .reporter = reporter
387388 self .crd_api_group , self .crd_api_version = crd_api .rsplit ('/' , 1 )
388389
389390 # TODO: handle config loading when main DCS is not Kubernetes based
390391 # apiclient = k8s_client.ApiClient(False)
391- kubernetes .config .load_incluster_config ()
392+ kubernetes .config .load_incluster_config () # pyright: ignore [reportUnknownMemberType]
392393 apiclient = kubernetes .client .ApiClient ()
393394 self ._customobj_api = kubernetes .client .CustomObjectsApi (apiclient )
394395 self ._events_api = kubernetes .client .EventsV1Api (apiclient )
395396
396397 self ._status_update = None
397398 self ._event_obj = None
398399
399- def state_transition (self , new_state , note ):
400+ def state_transition (self , new_state : str , note : str ):
400401 self ._status_update = {"status" : {"Multisite" : new_state }}
401402
402- failover_time = datetime .utcnow ( ).strftime ("%Y-%m-%dT%H:%M:%S.%fZ" )
403+ failover_time = datetime .now ( timezone . utc ).strftime ("%Y-%m-%dT%H:%M:%S.%fZ" )
403404 reason = 'Promote' if new_state == 'Leader' else 'Demote'
404- if note is None :
405+ if note == '' :
405406 note = 'Acquired multisite leader' if new_state == 'Leader' else 'Became a standby cluster'
406407
407408 self ._event_obj = kubernetes .client .EventsV1Event (
@@ -433,13 +434,12 @@ def store_updates(self):
433434 logger .warning ("Unable to store Kubernetes status update: %s" , e )
434435
435436 @catch_kubernetes_errors
436- def update_crd_state (self , update ):
437- self ._customobj_api .patch_namespaced_custom_object_status (self .crd_api_group , self .crd_api_version ,
438- self .crd_namespace ,
439- 'postgresqls' , self .crd_name + '/status' , update ,
440- field_manager = 'patroni' )
437+ def update_crd_state (self , update : Dict [str , Any ]):
438+ self ._customobj_api .patch_namespaced_custom_object_status ( # pyright: ignore [reportUnknownMemberType]
439+ self .crd_api_group , self .crd_api_version , self .crd_namespace , 'postgresqls' , self .crd_name + '/status' ,
440+ update , field_manager = 'patroni' )
441441
442442 return True
443443
444- def create_failover_event (self , event ):
445- self ._events_api .create_namespaced_event (self .crd_namespace , event )
444+ def create_failover_event (self , event : kubernetes . client . EventsV1Event ):
445+ self ._events_api .create_namespaced_event (self .crd_namespace , event ) # pyright: ignore [reportUnknownMemberType]
0 commit comments