11package dev .openfeature .contrib .providers .flagd ;
22
33import dev .openfeature .contrib .providers .flagd .resolver .Resolver ;
4- import dev .openfeature .contrib .providers .flagd .resolver .common .ConnectionEvent ;
4+ import dev .openfeature .contrib .providers .flagd .resolver .common .FlagdProviderEvent ;
5+ import dev .openfeature .contrib .providers .flagd .resolver .common .Util ;
56import dev .openfeature .contrib .providers .flagd .resolver .grpc .GrpcResolver ;
67import dev .openfeature .contrib .providers .flagd .resolver .grpc .cache .Cache ;
78import dev .openfeature .contrib .providers .flagd .resolver .process .InProcessResolver ;
1213import dev .openfeature .sdk .ImmutableStructure ;
1314import dev .openfeature .sdk .Metadata ;
1415import dev .openfeature .sdk .ProviderEvaluation ;
16+ import dev .openfeature .sdk .ProviderEvent ;
1517import dev .openfeature .sdk .ProviderEventDetails ;
1618import dev .openfeature .sdk .Structure ;
1719import dev .openfeature .sdk .Value ;
1820import java .util .ArrayList ;
1921import java .util .Collections ;
2022import java .util .List ;
23+ import java .util .concurrent .Executors ;
24+ import java .util .concurrent .ScheduledExecutorService ;
25+ import java .util .concurrent .ScheduledFuture ;
26+ import java .util .concurrent .TimeUnit ;
2127import java .util .function .Function ;
2228import lombok .extern .slf4j .Slf4j ;
2329
@@ -30,11 +36,31 @@ public class FlagdProvider extends EventProvider {
3036 private Function <Structure , EvaluationContext > contextEnricher ;
3137 private static final String FLAGD_PROVIDER = "flagd" ;
3238 private final Resolver flagResolver ;
33- private volatile boolean initialized = false ;
34- private volatile boolean connected = false ;
35- private volatile Structure syncMetadata = new ImmutableStructure ();
36- private volatile EvaluationContext enrichedContext = new ImmutableContext ();
3739 private final List <Hook > hooks = new ArrayList <>();
40+ private final EventsLock eventsLock = new EventsLock ();
41+
42+ /**
43+ * An executor service responsible for emitting
44+ * {@link ProviderEvent#PROVIDER_ERROR} after the provider went
45+ * {@link ProviderEvent#PROVIDER_STALE} for {@link #gracePeriod} seconds.
46+ */
47+ private final ScheduledExecutorService errorExecutor ;
48+
49+ /**
50+ * A scheduled task for emitting {@link ProviderEvent#PROVIDER_ERROR}.
51+ */
52+ private ScheduledFuture <?> errorTask ;
53+
54+ /**
55+ * The grace period in milliseconds to wait after
56+ * {@link ProviderEvent#PROVIDER_STALE} before emitting a
57+ * {@link ProviderEvent#PROVIDER_ERROR}.
58+ */
59+ private final long gracePeriod ;
60+ /**
61+ * The deadline in milliseconds for GRPC operations.
62+ */
63+ private final long deadline ;
3864
3965 protected final void finalize () {
4066 // DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
@@ -55,18 +81,34 @@ public FlagdProvider() {
5581 public FlagdProvider (final FlagdOptions options ) {
5682 switch (options .getResolverType ().asString ()) {
5783 case Config .RESOLVER_IN_PROCESS :
58- this .flagResolver = new InProcessResolver (options , this ::isConnected , this :: onConnectionEvent );
84+ this .flagResolver = new InProcessResolver (options , this ::onProviderEvent );
5985 break ;
6086 case Config .RESOLVER_RPC :
6187 this .flagResolver = new GrpcResolver (
62- options , new Cache (options .getCacheType (), options .getMaxCacheSize ()), this ::onConnectionEvent );
88+ options , new Cache (options .getCacheType (), options .getMaxCacheSize ()), this ::onProviderEvent );
6389 break ;
6490 default :
6591 throw new IllegalStateException (
6692 String .format ("Requested unsupported resolver type of %s" , options .getResolverType ()));
6793 }
6894 hooks .add (new SyncMetadataHook (this ::getEnrichedContext ));
6995 contextEnricher = options .getContextEnricher ();
96+ errorExecutor = Executors .newSingleThreadScheduledExecutor ();
97+ gracePeriod = options .getRetryGracePeriod ();
98+ deadline = options .getDeadline ();
99+ }
100+
101+ /**
102+ * Internal constructor for test cases.
103+ * DO NOT MAKE PUBLIC
104+ */
105+ FlagdProvider (Resolver resolver , boolean initialized ) {
106+ this .flagResolver = resolver ;
107+ deadline = Config .DEFAULT_DEADLINE ;
108+ gracePeriod = Config .DEFAULT_STREAM_RETRY_GRACE_PERIOD ;
109+ hooks .add (new SyncMetadataHook (this ::getEnrichedContext ));
110+ errorExecutor = Executors .newSingleThreadScheduledExecutor ();
111+ this .eventsLock .initialized = initialized ;
70112 }
71113
72114 @ Override
@@ -75,27 +117,39 @@ public List<Hook> getProviderHooks() {
75117 }
76118
77119 @ Override
78- public synchronized void initialize (EvaluationContext evaluationContext ) throws Exception {
79- if (this .initialized ) {
80- return ;
81- }
120+ public void initialize (EvaluationContext evaluationContext ) throws Exception {
121+ synchronized (eventsLock ) {
122+ if (eventsLock .initialized ) {
123+ return ;
124+ }
82125
83- this .flagResolver .init ();
84- this .initialized = this .connected = true ;
126+ flagResolver .init ();
127+ }
128+ // block till ready - this works with deadline fine for rpc, but with in_process
129+ // we also need to take parsing into the equation
130+ // TODO: evaluate where we are losing time, so we can remove this magic number -
131+ // follow up
132+ // wait outside of the synchonrization or we'll deadlock
133+ Util .busyWaitAndCheck (this .deadline * 2 , () -> eventsLock .initialized );
85134 }
86135
87136 @ Override
88- public synchronized void shutdown () {
89- if (!this .initialized ) {
90- return ;
91- }
92-
93- try {
94- this .flagResolver .shutdown ();
95- } catch (Exception e ) {
96- log .error ("Error during shutdown {}" , FLAGD_PROVIDER , e );
97- } finally {
98- this .initialized = false ;
137+ public void shutdown () {
138+ synchronized (eventsLock ) {
139+ if (!eventsLock .initialized ) {
140+ return ;
141+ }
142+ try {
143+ this .flagResolver .shutdown ();
144+ if (errorExecutor != null ) {
145+ errorExecutor .shutdownNow ();
146+ errorExecutor .awaitTermination (deadline , TimeUnit .MILLISECONDS );
147+ }
148+ } catch (Exception e ) {
149+ log .error ("Error during shutdown {}" , FLAGD_PROVIDER , e );
150+ } finally {
151+ eventsLock .initialized = false ;
152+ }
99153 }
100154 }
101155
@@ -106,27 +160,27 @@ public Metadata getMetadata() {
106160
107161 @ Override
108162 public ProviderEvaluation <Boolean > getBooleanEvaluation (String key , Boolean defaultValue , EvaluationContext ctx ) {
109- return this . flagResolver .booleanEvaluation (key , defaultValue , ctx );
163+ return flagResolver .booleanEvaluation (key , defaultValue , ctx );
110164 }
111165
112166 @ Override
113167 public ProviderEvaluation <String > getStringEvaluation (String key , String defaultValue , EvaluationContext ctx ) {
114- return this . flagResolver .stringEvaluation (key , defaultValue , ctx );
168+ return flagResolver .stringEvaluation (key , defaultValue , ctx );
115169 }
116170
117171 @ Override
118172 public ProviderEvaluation <Double > getDoubleEvaluation (String key , Double defaultValue , EvaluationContext ctx ) {
119- return this . flagResolver .doubleEvaluation (key , defaultValue , ctx );
173+ return flagResolver .doubleEvaluation (key , defaultValue , ctx );
120174 }
121175
122176 @ Override
123177 public ProviderEvaluation <Integer > getIntegerEvaluation (String key , Integer defaultValue , EvaluationContext ctx ) {
124- return this . flagResolver .integerEvaluation (key , defaultValue , ctx );
178+ return flagResolver .integerEvaluation (key , defaultValue , ctx );
125179 }
126180
127181 @ Override
128182 public ProviderEvaluation <Value > getObjectEvaluation (String key , Value defaultValue , EvaluationContext ctx ) {
129- return this . flagResolver .objectEvaluation (key , defaultValue , ctx );
183+ return flagResolver .objectEvaluation (key , defaultValue , ctx );
130184 }
131185
132186 /**
@@ -139,7 +193,7 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
139193 * @return Object map representing sync metadata
140194 */
141195 protected Structure getSyncMetadata () {
142- return new ImmutableStructure (syncMetadata .asMap ());
196+ return new ImmutableStructure (eventsLock . syncMetadata .asMap ());
143197 }
144198
145199 /**
@@ -148,50 +202,109 @@ protected Structure getSyncMetadata() {
148202 * @return context
149203 */
150204 EvaluationContext getEnrichedContext () {
151- return enrichedContext ;
205+ return eventsLock . enrichedContext ;
152206 }
153207
154- private boolean isConnected () {
155- return this .connected ;
156- }
208+ @ SuppressWarnings ("checkstyle:fallthrough" )
209+ private void onProviderEvent (FlagdProviderEvent flagdProviderEvent ) {
157210
158- private void onConnectionEvent (ConnectionEvent connectionEvent ) {
159- final boolean wasConnected = connected ;
160- final boolean isConnected = connected = connectionEvent .isConnected ();
211+ synchronized (eventsLock ) {
212+ log .info ("FlagdProviderEvent: {}" , flagdProviderEvent );
213+ eventsLock .syncMetadata = flagdProviderEvent .getSyncMetadata ();
214+ if (flagdProviderEvent .getSyncMetadata () != null ) {
215+ eventsLock .enrichedContext = contextEnricher .apply (flagdProviderEvent .getSyncMetadata ());
216+ }
161217
162- syncMetadata = connectionEvent .getSyncMetadata ();
163- enrichedContext = contextEnricher .apply (connectionEvent .getSyncMetadata ());
218+ /*
219+ * We only use Error and Ready as previous states.
220+ * As error will first be emitted as Stale, and only turns after a while into an
221+ * emitted Error.
222+ * Ready is needed, as the InProcessResolver does not have a dedicated ready
223+ * event, hence we need to
224+ * forward a configuration changed to the ready, if we are not in the ready
225+ * state.
226+ */
227+ switch (flagdProviderEvent .getEvent ()) {
228+ case PROVIDER_CONFIGURATION_CHANGED :
229+ if (eventsLock .previousEvent == ProviderEvent .PROVIDER_READY ) {
230+ onConfigurationChanged (flagdProviderEvent );
231+ break ;
232+ }
233+ // intentional fall through, a not-ready change will trigger a ready.
234+ case PROVIDER_READY :
235+ onReady ();
236+ eventsLock .previousEvent = ProviderEvent .PROVIDER_READY ;
237+ break ;
164238
165- if (!initialized ) {
166- return ;
239+ case PROVIDER_ERROR :
240+ if (eventsLock .previousEvent != ProviderEvent .PROVIDER_ERROR ) {
241+ onError ();
242+ }
243+ eventsLock .previousEvent = ProviderEvent .PROVIDER_ERROR ;
244+ break ;
245+ default :
246+ log .info ("Unknown event {}" , flagdProviderEvent .getEvent ());
247+ }
167248 }
249+ }
250+
251+ private void onConfigurationChanged (FlagdProviderEvent flagdProviderEvent ) {
252+ this .emitProviderConfigurationChanged (ProviderEventDetails .builder ()
253+ .flagsChanged (flagdProviderEvent .getFlagsChanged ())
254+ .message ("configuration changed" )
255+ .build ());
256+ }
168257
169- if (!wasConnected && isConnected ) {
170- ProviderEventDetails details = ProviderEventDetails .builder ()
171- .flagsChanged (connectionEvent .getFlagsChanged ())
172- .message ("connected to flagd" )
173- .build ();
174- this .emitProviderReady (details );
175- return ;
258+ private void onReady () {
259+ if (!eventsLock .initialized ) {
260+ eventsLock .initialized = true ;
261+ log .info ("initialized FlagdProvider" );
176262 }
263+ if (errorTask != null && !errorTask .isCancelled ()) {
264+ errorTask .cancel (false );
265+ log .debug ("Reconnection task cancelled as connection became READY." );
266+ }
267+ this .emitProviderReady (
268+ ProviderEventDetails .builder ().message ("connected to flagd" ).build ());
269+ }
177270
178- if (wasConnected && isConnected ) {
179- ProviderEventDetails details = ProviderEventDetails .builder ()
180- .flagsChanged (connectionEvent .getFlagsChanged ())
181- .message ("configuration changed" )
182- .build ();
183- this .emitProviderConfigurationChanged (details );
184- return ;
271+ private void onError () {
272+ log .info ("Connection lost. Emit STALE event..." );
273+ log .debug ("Waiting {}s for connection to become available..." , gracePeriod );
274+ this .emitProviderStale (ProviderEventDetails .builder ()
275+ .message ("there has been an error" )
276+ .build ());
277+
278+ if (errorTask != null && !errorTask .isCancelled ()) {
279+ errorTask .cancel (false );
185280 }
186281
187- if (connectionEvent .isStale ()) {
188- this .emitProviderStale (ProviderEventDetails .builder ()
189- .message ("there has been an error" )
190- .build ());
191- } else {
192- this .emitProviderError (ProviderEventDetails .builder ()
193- .message ("there has been an error" )
194- .build ());
282+ if (!errorExecutor .isShutdown ()) {
283+ errorTask = errorExecutor .schedule (
284+ () -> {
285+ if (eventsLock .previousEvent == ProviderEvent .PROVIDER_ERROR ) {
286+ log .debug (
287+ "Provider did not reconnect successfully within {}s. Emit ERROR event..." ,
288+ gracePeriod );
289+ flagResolver .onError ();
290+ this .emitProviderError (ProviderEventDetails .builder ()
291+ .message ("there has been an error" )
292+ .build ());
293+ }
294+ },
295+ gracePeriod ,
296+ TimeUnit .SECONDS );
195297 }
196298 }
299+
300+ /**
301+ * Contains all fields we need to worry about locking, used as intrinsic lock
302+ * for sync blocks.
303+ */
304+ static class EventsLock {
305+ volatile ProviderEvent previousEvent = null ;
306+ volatile Structure syncMetadata = new ImmutableStructure ();
307+ volatile boolean initialized = false ;
308+ volatile EvaluationContext enrichedContext = new ImmutableContext ();
309+ }
197310}
0 commit comments