From 8b7f5747aa321c8561eb2f53c7822b78324fd3a8 Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Mon, 10 Nov 2025 15:08:51 +0100 Subject: [PATCH 1/3] fix(flagd): no retry for certain error codes, implement test steps Signed-off-by: lea konvalinka --- .../contrib/providers/flagd/FlagdOptions.java | 9 +++++++++ .../connector/sync/SyncStreamQueueSource.java | 18 +++++++++++++++--- .../flagd/e2e/steps/ProviderSteps.java | 14 ++++++++++++++ .../providers/flagd/e2e/steps/Utils.java | 3 +++ .../flagd/e2e/steps/config/ConfigSteps.java | 1 + providers/flagd/test-harness | 2 +- 6 files changed, 43 insertions(+), 4 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java index a2463a946..9b5bb61a5 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java @@ -10,6 +10,7 @@ import io.grpc.ClientInterceptor; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; +import java.util.ArrayList; import java.util.List; import java.util.function.Function; import lombok.Builder; @@ -122,6 +123,14 @@ public class FlagdOptions { @Builder.Default private int retryGracePeriod = fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD); + + /** + * List of grpc response status codes for which failed connections are not retried. + * Defaults to empty list + */ + @Builder.Default + private List nonRetryableStatusCodes = new ArrayList<>(); + /** * Selector to be used with flag sync gRPC contract. **/ diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 196ab77a6..0cc787db4 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -16,10 +16,12 @@ import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest; import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse; import dev.openfeature.sdk.Awaitable; +import dev.openfeature.sdk.exceptions.FatalError; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -49,6 +51,7 @@ public class SyncStreamQueueSource implements QueueSource { private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); private final FlagSyncServiceStub flagSyncStub; private final FlagSyncServiceBlockingStub metadataStub; + private final List nonRetryableStatusCodes; /** * Creates a new SyncStreamQueueSource responsible for observing the event stream. @@ -65,6 +68,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer Date: Fri, 14 Nov 2025 12:14:38 +0100 Subject: [PATCH 2/3] attempt to handle fatal error Signed-off-by: lea konvalinka --- .../contrib/providers/flagd/FlagdOptions.java | 2 +- .../providers/flagd/FlagdProvider.java | 27 +++++++----- .../resolver/process/InProcessResolver.java | 3 ++ .../resolver/process/storage/FlagStore.java | 5 +++ .../storage/connector/QueuePayloadType.java | 3 +- .../connector/sync/SyncStreamQueueSource.java | 42 ++++++++++++++----- .../flagd/e2e/steps/ProviderSteps.java | 2 +- .../flagd/e2e/steps/config/ConfigSteps.java | 1 - 8 files changed, 60 insertions(+), 25 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java index 9b5bb61a5..993f55bdd 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java @@ -129,7 +129,7 @@ public class FlagdOptions { * Defaults to empty list */ @Builder.Default - private List nonRetryableStatusCodes = new ArrayList<>(); + private List fatalStatusCodes = new ArrayList<>(); /** * Selector to be used with flag sync gRPC contract. diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 4ce6e06ee..082f5a59e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -5,6 +5,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver; import dev.openfeature.contrib.providers.flagd.resolver.rpc.RpcResolver; import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache; +import dev.openfeature.sdk.ErrorCode; import dev.openfeature.sdk.EvaluationContext; import dev.openfeature.sdk.EventProvider; import dev.openfeature.sdk.Hook; @@ -135,7 +136,7 @@ public void initialize(EvaluationContext evaluationContext) throws Exception { public void shutdown() { synchronized (syncResources) { try { - if (!syncResources.isInitialized() || syncResources.isShutDown()) { + if (syncResources.isShutDown()) { return; } @@ -193,7 +194,7 @@ EvaluationContext getEnrichedContext() { @SuppressWarnings("checkstyle:fallthrough") private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { - log.debug("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent()); + log.info("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent()); synchronized (syncResources) { /* * We only use Error and Ready as previous states. @@ -222,20 +223,26 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { onReady(); syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY); break; - - case PROVIDER_ERROR: - if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) { - onError(); - syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR); + case PROVIDER_STALE: + if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_STALE) { + onStale(); + syncResources.setPreviousEvent(ProviderEvent.PROVIDER_STALE); } break; - + case PROVIDER_ERROR: + onError(); + break; default: log.warn("Unknown event {}", flagdProviderEvent.getEvent()); } } } + private void onError() { + this.emitProviderError(ProviderEventDetails.builder().errorCode(ErrorCode.PROVIDER_FATAL).build()); + shutdown(); + } + private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) { this.emitProviderConfigurationChanged(ProviderEventDetails.builder() .flagsChanged(flagdProviderEvent.getFlagsChanged()) @@ -255,7 +262,7 @@ private void onReady() { ProviderEventDetails.builder().message("connected to flagd").build()); } - private void onError() { + private void onStale() { log.debug( "Stream error. Emitting STALE, scheduling ERROR, and waiting {}s for connection to become available.", gracePeriod); @@ -270,7 +277,7 @@ private void onError() { if (!errorExecutor.isShutdown()) { errorTask = errorExecutor.schedule( () -> { - if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) { + if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_STALE) { log.error( "Provider did not reconnect successfully within {}s. Emitting ERROR event...", gracePeriod); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index e54c938cf..f313d943b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -77,6 +77,9 @@ public void init() throws Exception { storageStateChange.getSyncMetadata())); log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED"); break; + case STALE: + onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE)); + break; case ERROR: onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR)); break; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java index eaa3dfa5f..a01f93c23 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java @@ -138,6 +138,11 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc } break; case ERROR: + if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) { + log.warn("Failed to convey STALE status, queue is full"); + } + break; + case FATAL: if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) { log.warn("Failed to convey ERROR status, queue is full"); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java index 93675fb60..74e02912e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java @@ -3,5 +3,6 @@ /** Payload type emitted by {@link QueueSource}. */ public enum QueuePayloadType { DATA, - ERROR + ERROR, + FATAL } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 0cc787db4..915855b27 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -16,7 +16,6 @@ import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest; import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse; import dev.openfeature.sdk.Awaitable; -import dev.openfeature.sdk.exceptions.FatalError; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -51,7 +50,7 @@ public class SyncStreamQueueSource implements QueueSource { private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); private final FlagSyncServiceStub flagSyncStub; private final FlagSyncServiceBlockingStub metadataStub; - private final List nonRetryableStatusCodes; + private final List fatalStatusCodes; /** * Creates a new SyncStreamQueueSource responsible for observing the event stream. @@ -68,7 +67,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer queue, String message) { if (!queue.offer(new QueuePayload(QueuePayloadType.ERROR, message, null))) { log.error("Failed to convey ERROR status, queue is full"); } } + private static void enqueueFatal(BlockingQueue queue, String message) { + if (!queue.offer(new QueuePayload(QueuePayloadType.FATAL, message, null))) { + log.error("Failed to convey FATAL status, queue is full"); + } + } + private static class SyncStreamObserver implements StreamObserver { private final BlockingQueue outgoingQueue; private final AtomicBoolean shouldThrottle; private final Awaitable done = new Awaitable(); + private final List fatalStatusCodes; private Struct metadata; - public SyncStreamObserver(BlockingQueue outgoingQueue, AtomicBoolean shouldThrottle) { + public SyncStreamObserver(BlockingQueue outgoingQueue, AtomicBoolean shouldThrottle, List fatalStatusCodes) { this.outgoingQueue = outgoingQueue; this.shouldThrottle = shouldThrottle; + this.fatalStatusCodes = fatalStatusCodes; } @Override @@ -260,9 +275,14 @@ public void onNext(SyncFlagsResponse syncFlagsResponse) { @Override public void onError(Throwable throwable) { try { + Status status = Status.fromThrowable(throwable); String message = throwable != null ? throwable.getMessage() : "unknown"; log.debug("Stream error: {}, will restart", message, throwable); - enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); + if (fatalStatusCodes.contains(status.getCode())) { + enqueueFatal(outgoingQueue, String.format("Error from stream: %s", message)); + } else { + enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); + } // Set throttling flag to ensure backoff before retry this.shouldThrottle.set(true); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java index e2c7eef1e..230446f88 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java @@ -202,6 +202,6 @@ public void the_flag_was_modded() { @Then("the client is in {} state") public void the_client_is_in_fatal_state(String clientState) { - assertThat(state.client.getProviderState()).isEqualTo(ProviderState.FATAL); + assertThat(state.client.getProviderState()).isEqualTo(ProviderState.valueOf(clientState.toUpperCase())); } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/config/ConfigSteps.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/config/ConfigSteps.java index c1dad8dae..25a6cdc7d 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/config/ConfigSteps.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/config/ConfigSteps.java @@ -121,7 +121,6 @@ private static String mapOptionNames(String option) { propertyMapper.put("keepAliveTime", "keepAlive"); propertyMapper.put("retryBackoffMaxMs", "keepAlive"); propertyMapper.put("cache", "cacheType"); - propertyMapper.put("fatalStatusCodes", "nonRetryableStatusCodes"); if (propertyMapper.get(option) != null) { option = propertyMapper.get(option); From 654c8dad658e891f61cd8af51ed7385e4a8cabe2 Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Mon, 24 Nov 2025 10:49:57 +0100 Subject: [PATCH 3/3] fix(flagd): update testbed + step, fix event Signed-off-by: lea konvalinka --- .../providers/flagd/resolver/common/ChannelConnector.java | 2 +- .../contrib/providers/flagd/e2e/steps/ProviderSteps.java | 4 ++-- .../openfeature/contrib/providers/flagd/e2e/steps/Utils.java | 5 ++++- providers/flagd/test-harness | 2 +- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java index 6261affe7..032b1766c 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java @@ -86,7 +86,7 @@ private void onStateChange() { log.debug("Channel state changed to: {}", currentState); if (currentState == ConnectivityState.TRANSIENT_FAILURE || currentState == ConnectivityState.SHUTDOWN) { this.onConnectionEvent.accept(new FlagdProviderEvent( - ProviderEvent.PROVIDER_ERROR, Collections.emptyList(), new ImmutableStructure())); + ProviderEvent.PROVIDER_STALE, Collections.emptyList(), new ImmutableStructure())); } if (currentState != ConnectivityState.SHUTDOWN) { log.debug("continuing to monitor the grpc channel"); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java index 230446f88..0467c56e5 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java @@ -200,8 +200,8 @@ public void the_flag_was_modded() { .statusCode(200); } - @Then("the client is in {} state") - public void the_client_is_in_fatal_state(String clientState) { + @Then("the client should be in {} state") + public void the_client_should_be_in_fatal_state(String clientState) { assertThat(state.client.getProviderState()).isEqualTo(ProviderState.valueOf(clientState.toUpperCase())); } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java index c50c08397..a89f8560e 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java @@ -4,8 +4,10 @@ import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.CacheType; import dev.openfeature.sdk.Value; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; public final class Utils { @@ -39,7 +41,8 @@ public static Object convert(String value, String type) throws ClassNotFoundExce case "CacheType": return CacheType.valueOf(value.toUpperCase()).getValue(); case "StringList": - return List.of(value); + return value.isEmpty() ? List.of() : Arrays.stream(value.split(",")).map(String::trim).collect( + Collectors.toList()); case "Object": return Value.objectToValue(new ObjectMapper().readValue(value, Object.class)); } diff --git a/providers/flagd/test-harness b/providers/flagd/test-harness index bde8977a4..6948dcbab 160000 --- a/providers/flagd/test-harness +++ b/providers/flagd/test-harness @@ -1 +1 @@ -Subproject commit bde8977a4fa2b59ba4359bcf902e9adf4555d085 +Subproject commit 6948dcbabef284fae4a4c1d03ce5e0bd9ea34c17