77from cachebox import BaseCacheImpl , LRUCache
88from google .protobuf .json_format import MessageToDict
99from google .protobuf .struct_pb2 import Struct
10+ from grpc import ChannelConnectivity
1011
1112from openfeature .evaluation_context import EvaluationContext
1213from openfeature .event import ProviderEventDetails
@@ -47,53 +48,60 @@ def __init__(
4748 [ProviderEventDetails ], None
4849 ],
4950 ):
51+ self .active = False
5052 self .config = config
5153 self .emit_provider_ready = emit_provider_ready
5254 self .emit_provider_error = emit_provider_error
5355 self .emit_provider_stale = emit_provider_stale
5456 self .emit_provider_configuration_changed = emit_provider_configuration_changed
55- self .cache : typing .Optional [BaseCacheImpl ] = (
56- LRUCache (maxsize = self .config .max_cache_size )
57- if self .config .cache == CacheType .LRU
58- else None
59- )
60- self .stub , self .channel = self ._create_stub ()
61- self .retry_backoff_seconds = config .retry_backoff_ms * 0.001
62- self .retry_backoff_max_seconds = config .retry_backoff_max_ms * 0.001
63- self .retry_grace_attempts = config .retry_grace_attempts
57+ self .cache : typing .Optional [BaseCacheImpl ] = self ._create_cache ()
58+
59+ self .retry_grace_period = config .retry_grace_period
6460 self .streamline_deadline_seconds = config .stream_deadline_ms * 0.001
6561 self .deadline = config .deadline_ms * 0.001
6662 self .connected = False
67-
68- def _create_stub (
69- self ,
70- ) -> typing .Tuple [evaluation_pb2_grpc .ServiceStub , grpc .Channel ]:
71- config = self .config
7263 channel_factory = grpc .secure_channel if config .tls else grpc .insecure_channel
73- channel = channel_factory (
64+
65+ # Create the channel with the service config
66+ options = [
67+ ("grpc.keepalive_time_ms" , config .keep_alive_time ),
68+ ("grpc.initial_reconnect_backoff_ms" , config .retry_backoff_ms ),
69+ ("grpc.max_reconnect_backoff_ms" , config .retry_backoff_max_ms ),
70+ ("grpc.min_reconnect_backoff_ms" , config .deadline_ms ),
71+ ]
72+
73+ self .channel = channel_factory (
7474 f"{ config .host } :{ config .port } " ,
75- options = (( "grpc.keepalive_time_ms" , config . keep_alive_time ),) ,
75+ options = options ,
7676 )
77- stub = evaluation_pb2_grpc .ServiceStub (channel )
77+ self . stub = evaluation_pb2_grpc .ServiceStub (self . channel )
7878
79- return stub , channel
79+ self .thread : typing .Optional [threading .Thread ] = None
80+ self .timer : typing .Optional [threading .Timer ] = None
81+ self .active = False
82+
83+ def _create_cache (self ):
84+ return (
85+ LRUCache (maxsize = self .config .max_cache_size )
86+ if self .config .cache == CacheType .LRU
87+ else None
88+ )
8089
8190 def initialize (self , evaluation_context : EvaluationContext ) -> None :
8291 self .connect ()
8392
8493 def shutdown (self ) -> None :
8594 self .active = False
8695 self .channel .close ()
87- if self .cache :
88- self .cache .clear ()
8996
9097 def connect (self ) -> None :
9198 self .active = True
92- self .thread = threading .Thread (
93- target = self .listen , daemon = True , name = "FlagdGrpcServiceWorkerThread"
94- )
95- self .thread .start ()
9699
100+ # Run monitoring in a separate thread
101+ self .monitor_thread = threading .Thread (
102+ target = self .monitor , daemon = True , name = "FlagdGrpcServiceMonitorThread"
103+ )
104+ self .monitor_thread .start ()
97105 ## block until ready or deadline reached
98106 timeout = self .deadline + time .time ()
99107 while not self .connected and time .time () < timeout :
@@ -105,81 +113,87 @@ def connect(self) -> None:
105113 "Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
106114 )
107115
116+ def monitor (self ) -> None :
117+ def state_change_callback (new_state : ChannelConnectivity ) -> None :
118+ logger .debug (f"gRPC state change: { new_state } " )
119+ if new_state == ChannelConnectivity .READY :
120+ if not self .thread or not self .thread .is_alive ():
121+ self .thread = threading .Thread (
122+ target = self .listen ,
123+ daemon = True ,
124+ name = "FlagdGrpcServiceWorkerThread" ,
125+ )
126+ self .thread .start ()
127+
128+ if self .timer and self .timer .is_alive ():
129+ logger .debug ("gRPC error timer expired" )
130+ self .timer .cancel ()
131+
132+ elif new_state == ChannelConnectivity .TRANSIENT_FAILURE :
133+ # this is the failed reonnect attempt so we are going into stale
134+ self .emit_provider_stale (
135+ ProviderEventDetails (
136+ message = "gRPC sync disconnected, reconnecting" ,
137+ )
138+ )
139+ # adding a timer, so we can emit the error event after time
140+ self .timer = threading .Timer (self .retry_grace_period , self .emit_error )
141+
142+ logger .debug ("gRPC error timer started" )
143+ self .timer .start ()
144+ self .connected = False
145+
146+ self .channel .subscribe (state_change_callback , try_to_connect = True )
147+
148+ def emit_error (self ) -> None :
149+ logger .debug ("gRPC error emitted" )
150+ if self .cache is not None :
151+ self .cache .clear ()
152+ self .emit_provider_error (
153+ ProviderEventDetails (
154+ message = "gRPC sync disconnected, reconnecting" ,
155+ error_code = ErrorCode .GENERAL ,
156+ )
157+ )
158+
108159 def listen (self ) -> None :
109- retry_delay = self . retry_backoff_seconds
160+ logger . info ( "gRPC starting listener thread" )
110161 call_args = (
111162 {"timeout" : self .streamline_deadline_seconds }
112163 if self .streamline_deadline_seconds > 0
113164 else {}
114165 )
115- retry_counter = 0
116- while self .active :
117- request = evaluation_pb2 .EventStreamRequest ()
166+ request = evaluation_pb2 .EventStreamRequest ()
118167
168+ # defining a never ending loop to recreate the stream
169+ while self .active :
119170 try :
120- logger .debug ("Setting up gRPC sync flags connection" )
121- for message in self .stub .EventStream (request , ** call_args ):
171+ logger .info ("Setting up gRPC sync flags connection" )
172+ for message in self .stub .EventStream (
173+ request , wait_for_ready = True , ** call_args
174+ ):
122175 if message .type == "provider_ready" :
123- if not self .connected :
124- self .emit_provider_ready (
125- ProviderEventDetails (
126- message = "gRPC sync connection established"
127- )
176+ self .connected = True
177+ self .emit_provider_ready (
178+ ProviderEventDetails (
179+ message = "gRPC sync connection established"
128180 )
129- self .connected = True
130- retry_counter = 0
131- # reset retry delay after successsful read
132- retry_delay = self .retry_backoff_seconds
133-
181+ )
134182 elif message .type == "configuration_change" :
135183 data = MessageToDict (message )["data" ]
136184 self .handle_changed_flags (data )
137185
138186 if not self .active :
139187 logger .info ("Terminating gRPC sync thread" )
140188 return
141- except grpc .RpcError as e :
142- logger .error (f"SyncFlags stream error, { e .code ()= } { e .details ()= } " )
143- # re-create the stub if there's a connection issue - otherwise reconnect does not work as expected
144- self .stub , self .channel = self ._create_stub ()
189+ except grpc .RpcError as e : # noqa: PERF203
190+ # although it seems like this error log is not interesting, without it, the retry is not working as expected
191+ logger .debug (f"SyncFlags stream error, { e .code ()= } { e .details ()= } " )
145192 except ParseError :
146193 logger .exception (
147194 f"Could not parse flag data using flagd syntax: { message = } "
148195 )
149196
150- self .connected = False
151- self .on_connection_error (retry_counter , retry_delay )
152-
153- retry_delay = self .handle_retry (retry_counter , retry_delay )
154-
155- retry_counter = retry_counter + 1
156-
157- def handle_retry (self , retry_counter : int , retry_delay : float ) -> float :
158- if retry_counter == 0 :
159- logger .info ("gRPC sync disconnected, reconnecting immediately" )
160- else :
161- logger .info (f"gRPC sync disconnected, reconnecting in { retry_delay } s" )
162- time .sleep (retry_delay )
163- retry_delay = min (1.1 * retry_delay , self .retry_backoff_max_seconds )
164- return retry_delay
165-
166- def on_connection_error (self , retry_counter : int , retry_delay : float ) -> None :
167- if retry_counter == self .retry_grace_attempts :
168- if self .cache :
169- self .cache .clear ()
170- self .emit_provider_error (
171- ProviderEventDetails (
172- message = f"gRPC sync disconnected, reconnecting in { retry_delay } s" ,
173- error_code = ErrorCode .GENERAL ,
174- )
175- )
176- elif retry_counter == 1 :
177- self .emit_provider_stale (
178- ProviderEventDetails (
179- message = f"gRPC sync disconnected, reconnecting in { retry_delay } s" ,
180- )
181- )
182-
183197 def handle_changed_flags (self , data : typing .Any ) -> None :
184198 changed_flags = list (data ["flags" ].keys ())
185199
0 commit comments