Skip to content

Commit 2c1060c

Browse files
chore: refactor flagd event handling (#398)
Signed-off-by: Kavindu Dodanduwa <[email protected]> Signed-off-by: Kavindu Dodanduwa <[email protected]> Co-authored-by: Giovanni Liva <[email protected]>
1 parent 1d612a9 commit 2c1060c

File tree

15 files changed

+450
-238
lines changed

15 files changed

+450
-238
lines changed
Lines changed: 37 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,7 @@
11
package dev.openfeature.contrib.providers.flagd;
22

3-
import dev.openfeature.contrib.providers.flagd.cache.Cache;
43
import dev.openfeature.contrib.providers.flagd.cache.CacheFactory;
5-
import dev.openfeature.contrib.providers.flagd.grpc.FlagResolution;
6-
import dev.openfeature.contrib.providers.flagd.grpc.GrpcConnector;
7-
import dev.openfeature.contrib.providers.flagd.strategy.ResolveFactory;
8-
import dev.openfeature.contrib.providers.flagd.strategy.ResolveStrategy;
9-
import dev.openfeature.flagd.grpc.Schema.ResolveBooleanRequest;
10-
import dev.openfeature.flagd.grpc.Schema.ResolveFloatRequest;
11-
import dev.openfeature.flagd.grpc.Schema.ResolveIntRequest;
12-
import dev.openfeature.flagd.grpc.Schema.ResolveObjectRequest;
13-
import dev.openfeature.flagd.grpc.Schema.ResolveStringRequest;
4+
import dev.openfeature.contrib.providers.flagd.grpc.GrpcResolution;
145
import dev.openfeature.sdk.EvaluationContext;
156
import dev.openfeature.sdk.EventProvider;
167
import dev.openfeature.sdk.FeatureProvider;
@@ -34,12 +25,7 @@ public class FlagdProvider extends EventProvider implements FeatureProvider {
3425
private static final String FLAGD_PROVIDER = "flagD Provider";
3526

3627
private final ReadWriteLock lock = new ReentrantReadWriteLock();
37-
38-
private final Cache cache;
39-
private final ResolveStrategy strategy;
40-
private final GrpcConnector grpc;
41-
private final FlagResolution flagResolver;
42-
28+
private final Resolver flagResolver;
4329

4430
private ProviderState state = ProviderState.NOT_READY;
4531

@@ -56,28 +42,18 @@ public FlagdProvider() {
5642
* @param options {@link FlagdOptions} with
5743
*/
5844
public FlagdProvider(final FlagdOptions options) {
59-
this.strategy = ResolveFactory.getStrategy(options);
60-
this.cache = CacheFactory.getCache(options);
61-
this.grpc = new GrpcConnector(options, this.cache, this::setState);
62-
this.flagResolver = new FlagResolution(this.cache, this.strategy, this::getState);
63-
}
64-
65-
FlagdProvider(ResolveStrategy strategy, Cache cache, GrpcConnector grpc, FlagResolution resolution) {
66-
this.strategy = strategy;
67-
this.cache = cache;
68-
this.grpc = grpc;
69-
this.flagResolver = resolution;
45+
this.flagResolver = new GrpcResolution(options, CacheFactory.getCache(options), this::getState, this::setState);
7046
}
7147

7248
@Override
73-
public void initialize(EvaluationContext evaluationContext) throws RuntimeException {
74-
this.grpc.initialize();
49+
public void initialize(EvaluationContext evaluationContext) throws Exception {
50+
this.flagResolver.init();
7551
}
7652

7753
@Override
7854
public void shutdown() {
7955
try {
80-
this.grpc.shutdown();
56+
this.flagResolver.shutdown();
8157
} catch (Exception e) {
8258
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
8359
}
@@ -94,6 +70,37 @@ public ProviderState getState() {
9470
}
9571
}
9672

73+
74+
@Override
75+
public Metadata getMetadata() {
76+
return () -> FLAGD_PROVIDER;
77+
}
78+
79+
@Override
80+
public ProviderEvaluation<Boolean> getBooleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx) {
81+
return this.flagResolver.booleanEvaluation(key, defaultValue, ctx);
82+
}
83+
84+
@Override
85+
public ProviderEvaluation<String> getStringEvaluation(String key, String defaultValue, EvaluationContext ctx) {
86+
return this.flagResolver.stringEvaluation(key, defaultValue, ctx);
87+
}
88+
89+
@Override
90+
public ProviderEvaluation<Double> getDoubleEvaluation(String key, Double defaultValue, EvaluationContext ctx) {
91+
return this.flagResolver.doubleEvaluation(key, defaultValue, ctx);
92+
}
93+
94+
@Override
95+
public ProviderEvaluation<Integer> getIntegerEvaluation(String key, Integer defaultValue, EvaluationContext ctx) {
96+
return this.flagResolver.integerEvaluation(key, defaultValue, ctx);
97+
}
98+
99+
@Override
100+
public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultValue, EvaluationContext ctx) {
101+
return this.flagResolver.objectEvaluation(key, defaultValue, ctx);
102+
}
103+
97104
private void setState(ProviderState newState) {
98105
ProviderState oldState;
99106
Lock l = this.lock.writeLock();
@@ -108,10 +115,6 @@ private void setState(ProviderState newState) {
108115
}
109116

110117
private void handleStateTransition(ProviderState oldState, ProviderState newState) {
111-
// we are connected, reset the gRPC retry logic
112-
if (ProviderState.READY.equals(newState)) {
113-
this.grpc.resetRetryConnection();
114-
}
115118
// we got initialized
116119
if (ProviderState.NOT_READY.equals(oldState) && ProviderState.READY.equals(newState)) {
117120
// nothing to do, the SDK emits the events
@@ -140,61 +143,4 @@ private void handleStateTransition(ProviderState oldState, ProviderState newStat
140143
this.emitProviderConfigurationChanged(details);
141144
}
142145
}
143-
144-
145-
@Override
146-
public Metadata getMetadata() {
147-
return () -> FLAGD_PROVIDER;
148-
}
149-
150-
@Override
151-
public ProviderEvaluation<Boolean> getBooleanEvaluation(String key, Boolean defaultValue,
152-
EvaluationContext ctx) {
153-
154-
ResolveBooleanRequest request = ResolveBooleanRequest.newBuilder().buildPartial();
155-
156-
return this.flagResolver.resolve(key, ctx, request,
157-
this.grpc.getResolver()::resolveBoolean, null);
158-
}
159-
160-
@Override
161-
public ProviderEvaluation<String> getStringEvaluation(String key, String defaultValue,
162-
EvaluationContext ctx) {
163-
ResolveStringRequest request = ResolveStringRequest.newBuilder().buildPartial();
164-
165-
return this.flagResolver.resolve(key, ctx, request,
166-
this.grpc.getResolver()::resolveString, null);
167-
}
168-
169-
@Override
170-
public ProviderEvaluation<Double> getDoubleEvaluation(String key, Double defaultValue,
171-
EvaluationContext ctx) {
172-
ResolveFloatRequest request = ResolveFloatRequest.newBuilder().buildPartial();
173-
174-
return this.flagResolver.resolve(key, ctx, request,
175-
this.grpc.getResolver()::resolveFloat, null);
176-
}
177-
178-
@Override
179-
public ProviderEvaluation<Integer> getIntegerEvaluation(String key, Integer defaultValue,
180-
EvaluationContext ctx) {
181-
182-
ResolveIntRequest request = ResolveIntRequest.newBuilder().buildPartial();
183-
184-
return this.flagResolver.resolve(key, ctx, request,
185-
this.grpc.getResolver()::resolveInt,
186-
(Object value) -> ((Long) value).intValue());
187-
}
188-
189-
@Override
190-
public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultValue,
191-
EvaluationContext ctx) {
192-
193-
ResolveObjectRequest request = ResolveObjectRequest.newBuilder().buildPartial();
194-
195-
return this.flagResolver.resolve(key, ctx, request,
196-
this.grpc.getResolver()::resolveObject,
197-
(Object value) -> this.flagResolver.convertObjectResponse((com.google.protobuf.Struct) value));
198-
}
199-
200146
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package dev.openfeature.contrib.providers.flagd;
2+
3+
import dev.openfeature.sdk.EvaluationContext;
4+
import dev.openfeature.sdk.ProviderEvaluation;
5+
import dev.openfeature.sdk.Value;
6+
7+
/**
8+
* A generic contract flag resolving contract for flagd.
9+
*/
10+
public interface Resolver {
11+
void init() throws Exception;
12+
13+
void shutdown() throws Exception;
14+
15+
ProviderEvaluation<Boolean> booleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx);
16+
17+
ProviderEvaluation<String> stringEvaluation(String key, String defaultValue, EvaluationContext ctx);
18+
19+
ProviderEvaluation<Double> doubleEvaluation(String key, Double defaultValue, EvaluationContext ctx);
20+
21+
ProviderEvaluation<Integer> integerEvaluation(String key, Integer defaultValue, EvaluationContext ctx);
22+
23+
ProviderEvaluation<Value> objectEvaluation(String key, Value defaultValue, EvaluationContext ctx);
24+
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/grpc/EventStreamCallback.java

Lines changed: 0 additions & 9 deletions
This file was deleted.

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/grpc/EventStreamObserver.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
*/
1717
@Slf4j
1818
@SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects")
19-
public class EventStreamObserver implements StreamObserver<EventStreamResponse> {
19+
class EventStreamObserver implements StreamObserver<EventStreamResponse> {
2020
private final Consumer<ProviderState> stateConsumer;
21-
private final Runnable reconnectEventStream;
21+
private final Object sync;
2222
private final Cache cache;
2323

2424
private static final String CONFIGURATION_CHANGE = "configuration_change";
@@ -27,14 +27,15 @@ public class EventStreamObserver implements StreamObserver<EventStreamResponse>
2727

2828
/**
2929
* Create a gRPC stream that get notified about flag changes.
30-
* @param cache cache to update
31-
* @param stateConsumer lambda to call for setting the state
32-
* @param reconnectEventStream callback for trying to recreate the stream
30+
*
31+
* @param sync synchronization object from caller
32+
* @param cache cache to update
33+
* @param stateConsumer lambda to call for setting the state
3334
*/
34-
public EventStreamObserver(Cache cache, Consumer<ProviderState> stateConsumer, Runnable reconnectEventStream) {
35+
EventStreamObserver(Object sync, Cache cache, Consumer<ProviderState> stateConsumer) {
36+
this.sync = sync;
3537
this.cache = cache;
3638
this.stateConsumer = stateConsumer;
37-
this.reconnectEventStream = reconnectEventStream;
3839
}
3940

4041
@Override
@@ -58,7 +59,9 @@ public void onError(Throwable t) {
5859
this.cache.clear();
5960
}
6061
this.stateConsumer.accept(ProviderState.ERROR);
61-
this.reconnectEventStream.run();
62+
63+
// handle last call of this stream
64+
handleEndOfStream();
6265
}
6366

6467
@Override
@@ -67,6 +70,9 @@ public void onCompleted() {
6770
this.cache.clear();
6871
}
6972
this.stateConsumer.accept(ProviderState.ERROR);
73+
74+
// handle last call of this stream
75+
handleEndOfStream();
7076
}
7177

7278
private void handleConfigurationChangeEvent(EventStreamResponse value) {
@@ -93,4 +99,10 @@ private void handleProviderReadyEvent() {
9399
this.cache.clear();
94100
}
95101
}
102+
103+
private void handleEndOfStream() {
104+
synchronized (this.sync) {
105+
this.sync.notify();
106+
}
107+
}
96108
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/grpc/FlagdGrpcInterceptor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
* FlagdGrpcInterceptor is an interceptor for grpc communication from java-sdk to flagd.
1919
* <a href="https://github.com/open-telemetry/opentelemetry-java-docs">credits</a>
2020
*/
21-
public final class FlagdGrpcInterceptor implements ClientInterceptor {
21+
final class FlagdGrpcInterceptor implements ClientInterceptor {
2222
private static final TextMapSetter<Metadata> SETTER = new Setter();
2323

2424
private final OpenTelemetry openTelemetry;
2525

2626
/**
2727
* Intercept gRPC communication for propagating OpenTelemetry context over the wire.
28+
*
2829
* @param openTelemetry the OpenTelemetry implementation.
2930
*/
3031
public FlagdGrpcInterceptor(@Nonnull final OpenTelemetry openTelemetry) {

0 commit comments

Comments
 (0)