@@ -113,37 +113,37 @@ def connect(self) -> None:
113113 )
114114
115115 def monitor (self ) -> None :
116- def state_change_callback (new_state : ChannelConnectivity ) -> None :
117- logger .debug (f"gRPC state change: { new_state } " )
118- if new_state == ChannelConnectivity .READY :
119- if not self .thread or not self .thread .is_alive ():
120- self .thread = threading .Thread (
121- target = self .listen ,
122- daemon = True ,
123- name = "FlagdGrpcServiceWorkerThread" ,
124- )
125- self .thread .start ()
126-
127- if self .timer and self .timer .is_alive ():
128- logger .debug ("gRPC error timer expired" )
129- self .timer .cancel ()
130-
131- elif new_state == ChannelConnectivity .TRANSIENT_FAILURE :
132- # this is the failed reonnect attempt so we are going into stale
133- self .emit_provider_stale (
134- ProviderEventDetails (
135- message = "gRPC sync disconnected, reconnecting" ,
136- )
116+ self .channel .subscribe (self ._state_change_callback , try_to_connect = True )
117+
118+ def _state_change_callback (self , new_state : ChannelConnectivity ) -> None :
119+ logger .debug (f"gRPC state change: { new_state } " )
120+ if new_state == ChannelConnectivity .READY :
121+ if not self .thread or not self .thread .is_alive ():
122+ self .thread = threading .Thread (
123+ target = self .listen ,
124+ daemon = True ,
125+ name = "FlagdGrpcServiceWorkerThread" ,
137126 )
138- self .start_time = time .time ()
139- # adding a timer, so we can emit the error event after time
140- self .timer = threading .Timer (self .retry_grace_period , self .emit_error )
127+ self .thread .start ()
141128
142- logger . debug ( "gRPC error timer started" )
143- self . timer . start ( )
144- self .connected = False
129+ if self . timer and self . timer . is_alive ():
130+ logger . debug ( "gRPC error timer expired" )
131+ self .timer . cancel ()
145132
146- self .channel .subscribe (state_change_callback , try_to_connect = True )
133+ elif new_state == ChannelConnectivity .TRANSIENT_FAILURE :
134+ # this is the failed reconnect attempt so we are going into stale
135+ self .emit_provider_stale (
136+ ProviderEventDetails (
137+ message = "gRPC sync disconnected, reconnecting" ,
138+ )
139+ )
140+ self .start_time = time .time ()
141+ # adding a timer, so we can emit the error event after time
142+ self .timer = threading .Timer (self .retry_grace_period , self .emit_error )
143+
144+ logger .debug ("gRPC error timer started" )
145+ self .timer .start ()
146+ self .connected = False
147147
148148 def emit_error (self ) -> None :
149149 logger .debug ("gRPC error emitted" )
0 commit comments