11package dev .openfeature .contrib .providers .flagd ;
22
33import dev .openfeature .contrib .providers .flagd .resolver .Resolver ;
4- import dev .openfeature .contrib .providers .flagd .resolver .common .ConnectionEvent ;
4+ import dev .openfeature .contrib .providers .flagd .resolver .common .FlagdProviderEvent ;
5+ import dev .openfeature .contrib .providers .flagd .resolver .common .Util ;
56import dev .openfeature .contrib .providers .flagd .resolver .grpc .GrpcResolver ;
67import dev .openfeature .contrib .providers .flagd .resolver .grpc .cache .Cache ;
78import dev .openfeature .contrib .providers .flagd .resolver .process .InProcessResolver ;
1213import dev .openfeature .sdk .ImmutableStructure ;
1314import dev .openfeature .sdk .Metadata ;
1415import dev .openfeature .sdk .ProviderEvaluation ;
16+ import dev .openfeature .sdk .ProviderEvent ;
1517import dev .openfeature .sdk .ProviderEventDetails ;
1618import dev .openfeature .sdk .Structure ;
1719import dev .openfeature .sdk .Value ;
1820import java .util .ArrayList ;
1921import java .util .Collections ;
2022import java .util .List ;
23+ import java .util .concurrent .Executors ;
24+ import java .util .concurrent .ScheduledExecutorService ;
25+ import java .util .concurrent .ScheduledFuture ;
26+ import java .util .concurrent .TimeUnit ;
2127import java .util .function .Function ;
2228import lombok .extern .slf4j .Slf4j ;
2329
@@ -31,10 +37,29 @@ public class FlagdProvider extends EventProvider {
3137 private static final String FLAGD_PROVIDER = "flagd" ;
3238 private final Resolver flagResolver ;
3339 private volatile boolean initialized = false ;
34- private volatile boolean connected = false ;
3540 private volatile Structure syncMetadata = new ImmutableStructure ();
3641 private volatile EvaluationContext enrichedContext = new ImmutableContext ();
3742 private final List <Hook > hooks = new ArrayList <>();
43+ private volatile ProviderEvent previousEvent = null ;
44+
45+ /**
46+ * An executor service responsible for scheduling reconnection attempts.
47+ */
48+ private final ScheduledExecutorService reconnectExecutor ;
49+
50+ /**
51+ * A scheduled task for managing reconnection attempts.
52+ */
53+ private ScheduledFuture <?> reconnectTask ;
54+
55+ /**
56+ * The grace period in milliseconds to wait for reconnection before emitting an error event.
57+ */
58+ private final long gracePeriod ;
59+ /**
60+ * The deadline in milliseconds for GRPC operations.
61+ */
62+ private final long deadline ;
3863
3964 protected final void finalize () {
4065 // DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
@@ -55,18 +80,21 @@ public FlagdProvider() {
5580 public FlagdProvider (final FlagdOptions options ) {
5681 switch (options .getResolverType ().asString ()) {
5782 case Config .RESOLVER_IN_PROCESS :
58- this .flagResolver = new InProcessResolver (options , this ::isConnected , this :: onConnectionEvent );
83+ this .flagResolver = new InProcessResolver (options , this ::onProviderEvent );
5984 break ;
6085 case Config .RESOLVER_RPC :
6186 this .flagResolver = new GrpcResolver (
62- options , new Cache (options .getCacheType (), options .getMaxCacheSize ()), this ::onConnectionEvent );
87+ options , new Cache (options .getCacheType (), options .getMaxCacheSize ()), this ::onProviderEvent );
6388 break ;
6489 default :
6590 throw new IllegalStateException (
6691 String .format ("Requested unsupported resolver type of %s" , options .getResolverType ()));
6792 }
6893 hooks .add (new SyncMetadataHook (this ::getEnrichedContext ));
6994 contextEnricher = options .getContextEnricher ();
95+ this .reconnectExecutor = Executors .newSingleThreadScheduledExecutor ();
96+ this .gracePeriod = options .getRetryGracePeriod ();
97+ this .deadline = options .getDeadline ();
7098 }
7199
72100 @ Override
@@ -81,17 +109,22 @@ public synchronized void initialize(EvaluationContext evaluationContext) throws
81109 }
82110
83111 this .flagResolver .init ();
84- this .initialized = this .connected = true ;
112+ // block till ready - this works with deadline fine for rpc, but with in_process we also need to take parsing
113+ // into the equation
114+ Util .busyWaitAndCheck (this .deadline + 1000 , () -> initialized );
85115 }
86116
87117 @ Override
88118 public synchronized void shutdown () {
89119 if (!this .initialized ) {
90120 return ;
91121 }
92-
93122 try {
94123 this .flagResolver .shutdown ();
124+ if (reconnectExecutor != null ) {
125+ reconnectExecutor .shutdownNow ();
126+ reconnectExecutor .awaitTermination (deadline , TimeUnit .MILLISECONDS );
127+ }
95128 } catch (Exception e ) {
96129 log .error ("Error during shutdown {}" , FLAGD_PROVIDER , e );
97130 } finally {
@@ -151,47 +184,73 @@ EvaluationContext getEnrichedContext() {
151184 return enrichedContext ;
152185 }
153186
154- private boolean isConnected () {
155- return this .connected ;
156- }
187+ private void onProviderEvent (FlagdProviderEvent flagdProviderEvent ) {
157188
158- private void onConnectionEvent (ConnectionEvent connectionEvent ) {
159- final boolean wasConnected = connected ;
160- final boolean isConnected = connected = connectionEvent .isConnected ();
189+ syncMetadata = flagdProviderEvent .getSyncMetadata ();
190+ if (flagdProviderEvent .getSyncMetadata () != null ) {
191+ enrichedContext = contextEnricher .apply (flagdProviderEvent .getSyncMetadata ());
192+ }
161193
162- syncMetadata = connectionEvent .getSyncMetadata ();
163- enrichedContext = contextEnricher .apply (connectionEvent .getSyncMetadata ());
194+ switch (flagdProviderEvent .getEvent ()) {
195+ case PROVIDER_CONFIGURATION_CHANGED :
196+ if (previousEvent == ProviderEvent .PROVIDER_READY ) {
197+ this .emitProviderConfigurationChanged (ProviderEventDetails .builder ()
198+ .flagsChanged (flagdProviderEvent .getFlagsChanged ())
199+ .message ("configuration changed" )
200+ .build ());
201+ break ;
202+ }
203+ case PROVIDER_READY :
204+ onReady ();
205+ previousEvent = ProviderEvent .PROVIDER_READY ;
206+ break ;
164207
165- if (!initialized ) {
166- return ;
208+ case PROVIDER_ERROR :
209+ if (previousEvent != ProviderEvent .PROVIDER_ERROR ) {
210+ onError ();
211+ }
212+ previousEvent = ProviderEvent .PROVIDER_ERROR ;
213+ break ;
167214 }
215+ }
168216
169- if (!wasConnected && isConnected ) {
170- ProviderEventDetails details = ProviderEventDetails .builder ()
171- .flagsChanged (connectionEvent .getFlagsChanged ())
172- .message ("connected to flagd" )
173- .build ();
174- this .emitProviderReady (details );
175- return ;
217+ private void onReady () {
218+ if (!initialized ) {
219+ initialized = true ;
220+ log .info ("initialized FlagdProvider" );
221+ }
222+ if (reconnectTask != null && !reconnectTask .isCancelled ()) {
223+ reconnectTask .cancel (false );
224+ log .debug ("Reconnection task cancelled as connection became READY." );
176225 }
226+ this .emitProviderReady (
227+ ProviderEventDetails .builder ().message ("connected to flagd" ).build ());
228+ }
177229
178- if (wasConnected && isConnected ) {
179- ProviderEventDetails details = ProviderEventDetails .builder ()
180- .flagsChanged (connectionEvent .getFlagsChanged ())
181- .message ("configuration changed" )
182- .build ();
183- this .emitProviderConfigurationChanged (details );
184- return ;
230+ private void onError () {
231+ log .info ("Connection lost. Emit STALE event..." );
232+ log .debug ("Waiting {}s for connection to become available..." , gracePeriod );
233+ this .emitProviderStale (ProviderEventDetails .builder ()
234+ .message ("there has been an error" )
235+ .build ());
236+
237+ if (reconnectTask != null && !reconnectTask .isCancelled ()) {
238+ reconnectTask .cancel (false );
185239 }
186240
187- if (connectionEvent .isStale ()) {
188- this .emitProviderStale (ProviderEventDetails .builder ()
189- .message ("there has been an error" )
190- .build ());
191- } else {
192- this .emitProviderError (ProviderEventDetails .builder ()
193- .message ("there has been an error" )
194- .build ());
241+ if (!reconnectExecutor .isShutdown ()) {
242+ reconnectTask = reconnectExecutor .schedule (
243+ () -> {
244+ log .debug (
245+ "Provider did not reconnect successfully within {}s. Emit ERROR event..." , gracePeriod );
246+ flagResolver .onError ();
247+ this .emitProviderError (ProviderEventDetails .builder ()
248+ .message ("there has been an error" )
249+ .build ());
250+ ;
251+ },
252+ gracePeriod ,
253+ TimeUnit .SECONDS );
195254 }
196255 }
197256}
0 commit comments