Skip to content

Commit 6f89ff0

Browse files
leakonvalinkaroot
authored andcommitted
fix(flagd): no retry for certain error codes, implement test steps
Signed-off-by: root <root@DT-7ZQDKC4.>
1 parent 0b720f3 commit 6f89ff0

File tree

6 files changed

+43
-4
lines changed

6 files changed

+43
-4
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.grpc.ClientInterceptor;
1111
import io.opentelemetry.api.GlobalOpenTelemetry;
1212
import io.opentelemetry.api.OpenTelemetry;
13+
import java.util.ArrayList;
1314
import java.util.List;
1415
import java.util.function.Function;
1516
import lombok.Builder;
@@ -122,6 +123,14 @@ public class FlagdOptions {
122123
@Builder.Default
123124
private int retryGracePeriod =
124125
fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);
126+
127+
/**
128+
* List of grpc response status codes for which failed connections are not retried.
129+
* Defaults to empty list
130+
*/
131+
@Builder.Default
132+
private List<String> nonRetryableStatusCodes = new ArrayList<>();
133+
125134
/**
126135
* Selector to be used with flag sync gRPC contract.
127136
**/

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest;
1717
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
1818
import dev.openfeature.sdk.Awaitable;
19+
import dev.openfeature.sdk.exceptions.FatalError;
1920
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2021
import io.grpc.Status;
2122
import io.grpc.StatusRuntimeException;
2223
import io.grpc.stub.StreamObserver;
24+
import java.util.List;
2325
import java.util.concurrent.BlockingQueue;
2426
import java.util.concurrent.LinkedBlockingQueue;
2527
import java.util.concurrent.TimeUnit;
@@ -48,6 +50,7 @@ public class SyncStreamQueueSource implements QueueSource {
4850
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
4951
private final FlagSyncServiceStub flagSyncStub;
5052
private final FlagSyncServiceBlockingStub metadataStub;
53+
private final List<String> nonRetryableStatusCodes;
5154

5255
/**
5356
* Creates a new SyncStreamQueueSource responsible for observing the event stream.
@@ -64,6 +67,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
6467
FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
6568
metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
6669
.withWaitForReady();
70+
nonRetryableStatusCodes = options.getNonRetryableStatusCodes();
6771
}
6872

6973
// internal use only
@@ -81,6 +85,7 @@ protected SyncStreamQueueSource(
8185
flagSyncStub = stubMock;
8286
syncMetadataDisabled = options.isSyncMetadataDisabled();
8387
metadataStub = blockingStubMock;
88+
nonRetryableStatusCodes = options.getNonRetryableStatusCodes();
8489
}
8590

8691
/** Initialize sync stream connector. */
@@ -121,8 +126,11 @@ private void observeSyncStream() {
121126
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue);
122127
try {
123128
observer.metadata = getMetadata();
124-
} catch (Exception metaEx) {
125-
// retry if getMetadata fails
129+
} catch (StatusRuntimeException metaEx) {
130+
if (nonRetryableStatusCodes.contains(metaEx.getStatus().getCode().name())) {
131+
throw new FatalError("Failed to connect for metadata request, not retrying for error " + metaEx.getStatus());
132+
}
133+
// retry for other status codes
126134
String message = metaEx.getMessage();
127135
log.debug("Metadata request error: {}, will restart", message, metaEx);
128136
enqueueError(String.format("Error in getMetadata request: %s", message));
@@ -132,7 +140,11 @@ private void observeSyncStream() {
132140

133141
try {
134142
syncFlags(observer);
135-
} catch (Exception ex) {
143+
} catch (StatusRuntimeException ex) {
144+
if (nonRetryableStatusCodes.contains(ex.getStatus().toString())) {
145+
throw new FatalError("Failed to connect to sync stream, not retrying for error " + ex.getStatus());
146+
}
147+
// retry for other status codes
136148
log.error("Unexpected sync stream exception, will restart.", ex);
137149
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
138150
Thread.sleep(this.maxBackoffMs);

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dev.openfeature.contrib.providers.flagd.e2e.steps;
22

33
import static io.restassured.RestAssured.when;
4+
import static org.assertj.core.api.Assertions.assertThat;
45

56
import dev.openfeature.contrib.providers.flagd.Config;
67
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
@@ -9,10 +10,12 @@
910
import dev.openfeature.contrib.providers.flagd.e2e.State;
1011
import dev.openfeature.sdk.FeatureProvider;
1112
import dev.openfeature.sdk.OpenFeatureAPI;
13+
import dev.openfeature.sdk.ProviderState;
1214
import io.cucumber.java.After;
1315
import io.cucumber.java.AfterAll;
1416
import io.cucumber.java.BeforeAll;
1517
import io.cucumber.java.en.Given;
18+
import io.cucumber.java.en.Then;
1619
import io.cucumber.java.en.When;
1720
import java.io.File;
1821
import java.io.IOException;
@@ -33,6 +36,7 @@
3336
public class ProviderSteps extends AbstractSteps {
3437

3538
public static final int UNAVAILABLE_PORT = 9999;
39+
public static final int FORBIDDEN_PORT = 9212;
3640
static ComposeContainer container;
3741

3842
static Path sharedTempDir;
@@ -51,6 +55,7 @@ public static void beforeAll() throws IOException {
5155
.withExposedService("flagd", 8015, Wait.forListeningPort())
5256
.withExposedService("flagd", 8080, Wait.forListeningPort())
5357
.withExposedService("envoy", 9211, Wait.forListeningPort())
58+
.withExposedService("envoy", 9212, Wait.forListeningPort())
5459
.withStartupTimeout(Duration.ofSeconds(45));
5560
container.start();
5661
}
@@ -87,6 +92,10 @@ public void setupProvider(String providerType) throws InterruptedException {
8792
}
8893
wait = false;
8994
break;
95+
case "forbidden":
96+
state.builder.port(container.getServicePort("envoy", FORBIDDEN_PORT));
97+
wait = false;
98+
break;
9099
case "socket":
91100
this.state.providerType = ProviderType.SOCKET;
92101
String socketPath =
@@ -190,4 +199,9 @@ public void the_flag_was_modded() {
190199
.then()
191200
.statusCode(200);
192201
}
202+
203+
@Then("the client is in {} state")
204+
public void the_client_is_in_fatal_state(String clientState) {
205+
assertThat(state.client.getProviderState()).isEqualTo(ProviderState.FATAL);
206+
}
193207
}

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.CacheType;
55
import dev.openfeature.sdk.Value;
66
import java.io.IOException;
7+
import java.util.List;
78
import java.util.Objects;
89
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
910

@@ -37,6 +38,8 @@ public static Object convert(String value, String type) throws ClassNotFoundExce
3738
}
3839
case "CacheType":
3940
return CacheType.valueOf(value.toUpperCase()).getValue();
41+
case "StringList":
42+
return List.of(value);
4043
case "Object":
4144
return Value.objectToValue(new ObjectMapper().readValue(value, Object.class));
4245
}

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/config/ConfigSteps.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ private static String mapOptionNames(String option) {
121121
propertyMapper.put("keepAliveTime", "keepAlive");
122122
propertyMapper.put("retryBackoffMaxMs", "keepAlive");
123123
propertyMapper.put("cache", "cacheType");
124+
propertyMapper.put("fatalStatusCodes", "nonRetryableStatusCodes");
124125

125126
if (propertyMapper.get(option) != null) {
126127
option = propertyMapper.get(option);

providers/flagd/test-harness

0 commit comments

Comments
 (0)