diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java index deaeedfe8..e41121e5c 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java @@ -14,6 +14,7 @@ public final class Config { static final String DEFAULT_HOST = "localhost"; static final int DEFAULT_DEADLINE = 500; + static final int DEFAULT_MAX_RETRY_BACKOFF_MS = 12000; static final int DEFAULT_STREAM_DEADLINE_MS = 10 * 60 * 1000; static final int DEFAULT_STREAM_RETRY_GRACE_PERIOD = 5; static final int DEFAULT_MAX_CACHE_SIZE = 1000; @@ -31,6 +32,7 @@ public final class Config { static final String MAX_CACHE_SIZE_ENV_VAR_NAME = "FLAGD_MAX_CACHE_SIZE"; static final String BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME = "FLAGD_RETRY_BACKOFF_MS"; static final String DEADLINE_MS_ENV_VAR_NAME = "FLAGD_DEADLINE_MS"; + static final String FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME = "FLAGD_RETRY_BACKOFF_MAX_MS"; static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS"; static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR"; /** diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java index bba7750de..a2463a946 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java @@ -98,10 +98,18 @@ public class FlagdOptions { @Builder.Default private int deadline = fallBackToEnvOrDefault(Config.DEADLINE_MS_ENV_VAR_NAME, Config.DEFAULT_DEADLINE); + /** + * Max stream retry backoff in milliseconds. + */ + @Builder.Default + private int retryBackoffMaxMs = + fallBackToEnvOrDefault(Config.FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME, Config.DEFAULT_MAX_RETRY_BACKOFF_MS); + /** * Streaming connection deadline in milliseconds. * Set to 0 to disable the deadline. - * Defaults to 600000 (10 minutes); recommended to prevent infrastructure from killing idle connections. + * Defaults to 600000 (10 minutes); recommended to prevent infrastructure from + * killing idle connections. */ @Builder.Default private int streamDeadlineMs = diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java index 7e187e9cd..39f33af27 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java @@ -24,67 +24,57 @@ import javax.net.ssl.SSLException; /** gRPC channel builder helper. */ +@SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "we don't care to serialize this") public class ChannelBuilder { - /** * Controls retry (not-reconnection) policy for failed RPCs. */ @SuppressWarnings({"unchecked", "rawtypes"}) - static final Map SERVICE_CONFIG_WITH_RETRY = new HashMap() { - { - put("methodConfig", Arrays.asList(new HashMap() { - { - put( - "name", - Arrays.asList( - new HashMap() { - { - put("service", "flagd.sync.v1.FlagSyncService"); - } - }, - new HashMap() { - { - put("service", "flagd.evaluation.v1.Service"); - } - })); - put("retryPolicy", new HashMap() { - { - // 1 + 2 + 4 - put("maxAttempts", 3.0); // types used here are important, need to be doubles - put("initialBackoff", "1s"); - put("maxBackoff", "5s"); - put("backoffMultiplier", 2.0); - // status codes to retry on: - put( - "retryableStatusCodes", - Arrays.asList( - /* - * All codes are retryable except OK and DEADLINE_EXCEEDED since - * any others not listed here cause a very tight loop of retries. - * DEADLINE_EXCEEDED is typically a result of a client specified deadline, - * and definitionally should not result in a tight loop (it's a timeout). - */ - Code.CANCELLED.toString(), - Code.UNKNOWN.toString(), - Code.INVALID_ARGUMENT.toString(), - Code.NOT_FOUND.toString(), - Code.ALREADY_EXISTS.toString(), - Code.PERMISSION_DENIED.toString(), - Code.RESOURCE_EXHAUSTED.toString(), - Code.FAILED_PRECONDITION.toString(), - Code.ABORTED.toString(), - Code.OUT_OF_RANGE.toString(), - Code.UNIMPLEMENTED.toString(), - Code.INTERNAL.toString(), - Code.UNAVAILABLE.toString(), - Code.DATA_LOSS.toString(), - Code.UNAUTHENTICATED.toString())); - } - }); - } - })); - } - }; + static Map buildRetryPolicy(final FlagdOptions options) { + return new HashMap() { + { + put("methodConfig", Arrays.asList(new HashMap() { + { + put( + "name", + Arrays.asList( + new HashMap() { + { + put("service", "flagd.sync.v1.FlagSyncService"); + } + }, + new HashMap() { + { + put("service", "flagd.evaluation.v1.Service"); + } + })); + put("retryPolicy", new HashMap() { + { + // 1 + 2 + 4 + put("maxAttempts", 3.0); // types used here are important, need to be doubles + put("initialBackoff", "1s"); + put( + "maxBackoff", + options.getRetryBackoffMaxMs() >= 1000 + ? String.format("%ds", options.getRetryBackoffMaxMs() / 1000) + : "1s"); + put("backoffMultiplier", 2.0); + // status codes to retry on: + put( + "retryableStatusCodes", + Arrays.asList( + /* + * As per gRPC spec, the following status codes are safe to retry: + * UNAVAILABLE, UNKNOWN, + */ + Code.UNKNOWN.toString(), Code.UNAVAILABLE.toString())); + } + }); + } + })); + } + }; + } private ChannelBuilder() {} @@ -109,7 +99,7 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) { .eventLoopGroup(new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory())) .channelType(EpollDomainSocketChannel.class) .usePlaintext() - .defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY) + .defaultServiceConfig(buildRetryPolicy(options)) .enableRetry() .build(); } @@ -155,7 +145,7 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) { builder.intercept(new FlagdGrpcInterceptor(options.getOpenTelemetry())); } - return builder.defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY) + return builder.defaultServiceConfig(buildRetryPolicy(options)) .enableRetry() .build(); } catch (SSLException ssle) { diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java index fe66ef59c..6261affe7 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java @@ -5,8 +5,6 @@ import dev.openfeature.sdk.ProviderEvent; import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; -import io.grpc.stub.AbstractBlockingStub; -import io.grpc.stub.AbstractStub; import java.util.Collections; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -16,12 +14,9 @@ /** * A generic GRPC connector that manages connection states, reconnection logic, and event streaming for * GRPC services. - * - * @param the type of the asynchronous stub for the GRPC service - * @param the type of the blocking stub for the GRPC service */ @Slf4j -public class ChannelConnector, K extends AbstractBlockingStub> { +public class ChannelConnector { /** * The GRPC managed channel for managing the underlying GRPC connection. @@ -48,22 +43,11 @@ public class ChannelConnector, K extends AbstractBlock */ public ChannelConnector( final FlagdOptions options, final Consumer onConnectionEvent, ManagedChannel channel) { - this.channel = channel; this.deadline = options.getDeadline(); this.onConnectionEvent = onConnectionEvent; } - /** - * Constructs a {@code ChannelConnector} instance for testing purposes. - * - * @param options the configuration options for the GRPC connection - * @param onConnectionEvent a consumer to handle connection events - */ - public ChannelConnector(final FlagdOptions options, final Consumer onConnectionEvent) { - this(options, onConnectionEvent, ChannelBuilder.nettyChannel(options)); - } - /** * Initializes the GRPC connection by waiting for the channel to be ready and monitoring its state. * diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 3b3494677..d68adaeae 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -2,10 +2,9 @@ import com.google.protobuf.Struct; import dev.openfeature.contrib.providers.flagd.FlagdOptions; +import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector; import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; -import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver; -import dev.openfeature.contrib.providers.flagd.resolver.common.StreamResponseModel; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource; @@ -16,9 +15,11 @@ import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse; import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest; import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse; +import dev.openfeature.sdk.Awaitable; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.grpc.Context; -import io.grpc.Context.CancellableContext; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -39,15 +40,14 @@ public class SyncStreamQueueSource implements QueueSource { private final AtomicBoolean shutdown = new AtomicBoolean(false); private final int streamDeadline; private final int deadline; + private final int maxBackoffMs; private final String selector; private final String providerId; private final boolean syncMetadataDisabled; - private final ChannelConnector channelConnector; - private final LinkedBlockingQueue> incomingQueue = - new LinkedBlockingQueue<>(QUEUE_SIZE); + private final ChannelConnector channelConnector; private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); - private final FlagSyncServiceStub stub; - private final FlagSyncServiceBlockingStub blockingStub; + private final FlagSyncServiceStub flagSyncStub; + private final FlagSyncServiceBlockingStub metadataStub; /** * Creates a new SyncStreamQueueSource responsible for observing the event stream. @@ -57,17 +57,19 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer(options, onConnectionEvent); - this.stub = FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady(); - this.blockingStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel()) + channelConnector = new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options)); + flagSyncStub = + FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady(); + metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel()) .withWaitForReady(); } // internal use only protected SyncStreamQueueSource( final FlagdOptions options, - ChannelConnector connectorMock, + ChannelConnector connectorMock, FlagSyncServiceStub stubMock, FlagSyncServiceBlockingStub blockingStubMock) { streamDeadline = options.getStreamDeadlineMs(); @@ -75,23 +77,16 @@ protected SyncStreamQueueSource( selector = options.getSelector(); providerId = options.getProviderId(); channelConnector = connectorMock; - stub = stubMock; + maxBackoffMs = options.getRetryBackoffMaxMs(); + flagSyncStub = stubMock; syncMetadataDisabled = options.isSyncMetadataDisabled(); - blockingStub = blockingStubMock; + metadataStub = blockingStubMock; } /** Initialize sync stream connector. */ public void init() throws Exception { channelConnector.initialize(); - Thread listener = new Thread(() -> { - try { - observeSyncStream(); - } catch (InterruptedException e) { - log.warn("gRPC event stream interrupted, flag configurations are stale", e); - Thread.currentThread().interrupt(); - } - }); - + Thread listener = new Thread(this::observeSyncStream); listener.setDaemon(true); listener.start(); } @@ -114,84 +109,76 @@ public void shutdown() throws InterruptedException { } /** Contains blocking calls, to be used concurrently. */ - private void observeSyncStream() throws InterruptedException { - + private void observeSyncStream() { log.info("Initializing sync stream observer"); // outer loop for re-issuing the stream request - // "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions + // "waitForReady" on the channel, plus our retry policy slow this loop down in + // error conditions while (!shutdown.get()) { + try { + log.debug("Initializing sync stream request"); + SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue); + try { + observer.metadata = getMetadata(); + } catch (Exception metaEx) { + // retry if getMetadata fails + String message = metaEx.getMessage(); + log.debug("Metadata request error: {}, will restart", message, metaEx); + enqueueError(String.format("Error in getMetadata request: %s", message)); + Thread.sleep(this.maxBackoffMs); + continue; + } - log.debug("Initializing sync stream request"); - final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder(); - GetMetadataResponse metadataResponse = null; + try { + syncFlags(observer); + } catch (Exception ex) { + log.error("Unexpected sync stream exception, will restart.", ex); + enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); + Thread.sleep(this.maxBackoffMs); + } + } catch (InterruptedException ie) { + log.debug("Stream loop interrupted, most likely shutdown was invoked", ie); + } + } - // create a context which exists to track and cancel the stream - try (CancellableContext context = Context.current().withCancellation()) { + log.info("Shutdown invoked, exiting event stream listener"); + } - restart(); // start the stream with the context + // TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584 + private Struct getMetadata() { + if (syncMetadataDisabled) { + return null; + } - // TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584 - if (!syncMetadataDisabled) { - try { - FlagSyncServiceBlockingStub localStub = blockingStub; + FlagSyncServiceBlockingStub localStub = metadataStub; - if (deadline > 0) { - localStub = localStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS); - } + if (deadline > 0) { + localStub = localStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS); + } - metadataResponse = localStub.getMetadata(metadataRequest.build()); - } catch (Exception metaEx) { - // cancel the stream if the getMetadata fails - // we can keep this log quiet since the stream is cancelled/restarted with this exception - log.debug("Metadata exception: {}, cancelling stream", metaEx.getMessage(), metaEx); - context.cancel(metaEx); - } - } + try { + GetMetadataResponse metadataResponse = localStub.getMetadata(GetMetadataRequest.getDefaultInstance()); - // inner loop for handling messages - while (!shutdown.get() && !Context.current().isCancelled()) { - final StreamResponseModel taken = incomingQueue.take(); - if (taken.isComplete()) { - log.debug("Sync stream completed, will restart"); - // The stream is complete, we still try to reconnect - break; - } - - Throwable streamException = taken.getError(); - if (streamException != null) { - log.debug("Exception in stream RPC, streamException {}, will restart", streamException); - if (!outgoingQueue.offer(new QueuePayload( - QueuePayloadType.ERROR, - String.format("Error from stream: %s", streamException.getMessage())))) { - log.error("Failed to convey ERROR status, queue is full"); - } - break; - } - - final SyncFlagsResponse flagsResponse = taken.getResponse(); - final String data = flagsResponse.getFlagConfiguration(); - log.debug("Got stream response: {}", data); - - Struct syncContext = null; - if (flagsResponse.hasSyncContext()) { - syncContext = flagsResponse.getSyncContext(); - } else if (metadataResponse != null) { - syncContext = metadataResponse.getMetadata(); - } - - if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, syncContext))) { - log.error("Stream writing failed"); - } - } + if (metadataResponse != null) { + return metadataResponse.getMetadata(); } - } - log.info("Shutdown invoked, exiting event stream listener"); + return null; + } catch (StatusRuntimeException e) { + // In newer versions of flagd, metadata is part of the sync stream. If the method is unimplemented, we + // can ignore the error + if (e.getStatus() != null + && Status.Code.UNIMPLEMENTED.equals(e.getStatus().getCode())) { + return null; + } + + throw e; + } } - private void restart() { - FlagSyncServiceStub localStub = stub; // don't mutate the stub + private void syncFlags(SyncStreamObserver streamObserver) { + FlagSyncServiceStub localStub = flagSyncStub; // don't mutate the stub if (streamDeadline > 0) { localStub = localStub.withDeadlineAfter(streamDeadline, TimeUnit.MILLISECONDS); } @@ -205,6 +192,58 @@ private void restart() { syncRequest.setProviderId(this.providerId); } - localStub.syncFlags(syncRequest.build(), new QueueingStreamObserver(incomingQueue)); + localStub.syncFlags(syncRequest.build(), streamObserver); + + streamObserver.done.await(); + } + + private void enqueueError(String message) { + enqueueError(outgoingQueue, message); + } + + private static void enqueueError(BlockingQueue queue, String message) { + if (!queue.offer(new QueuePayload(QueuePayloadType.ERROR, message, null))) { + log.error("Failed to convey ERROR status, queue is full"); + } + } + + private static class SyncStreamObserver implements StreamObserver { + private final BlockingQueue outgoingQueue; + private final Awaitable done = new Awaitable(); + + private Struct metadata; + + public SyncStreamObserver(BlockingQueue outgoingQueue) { + this.outgoingQueue = outgoingQueue; + } + + @Override + public void onNext(SyncFlagsResponse syncFlagsResponse) { + final String data = syncFlagsResponse.getFlagConfiguration(); + log.debug("Got stream response: {}", data); + + Struct syncContext = syncFlagsResponse.hasSyncContext() ? syncFlagsResponse.getSyncContext() : metadata; + + if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, syncContext))) { + log.error("Stream writing failed"); + } + } + + @Override + public void onError(Throwable throwable) { + try { + String message = throwable != null ? throwable.getMessage() : "unknown"; + log.debug("Stream error: {}, will restart", message, throwable); + enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); + } finally { + done.wakeup(); + } + } + + @Override + public void onCompleted() { + log.debug("Sync stream completed, will restart"); + done.wakeup(); + } } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java index 466c26359..1f3101d00 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java @@ -9,6 +9,7 @@ import dev.openfeature.contrib.providers.flagd.Config; import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.Resolver; +import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector; import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver; @@ -59,7 +60,7 @@ public final class RpcResolver implements Resolver { private static final int QUEUE_SIZE = 5; private final AtomicBoolean shutdown = new AtomicBoolean(false); - private final ChannelConnector connector; + private final ChannelConnector connector; private final Cache cache; private final ResolveStrategy strategy; private final FlagdOptions options; @@ -83,7 +84,7 @@ public RpcResolver( this.strategy = ResolveFactory.getStrategy(options); this.options = options; incomingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); - this.connector = new ChannelConnector<>(options, onProviderEvent); + this.connector = new ChannelConnector(options, onProviderEvent, ChannelBuilder.nettyChannel(options)); this.onProviderEvent = onProviderEvent; this.stub = ServiceGrpc.newStub(this.connector.getChannel()).withWaitForReady(); this.blockingStub = @@ -97,7 +98,7 @@ protected RpcResolver( final Consumer onProviderEvent, ServiceStub mockStub, ServiceBlockingStub mockBlockingStub, - ChannelConnector connector) { + ChannelConnector connector) { this.cache = cache; this.strategy = ResolveFactory.getStrategy(options); this.options = options; diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilderTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilderTest.java index 7a8e777a7..cee418010 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilderTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilderTest.java @@ -102,6 +102,12 @@ void testNettyChannel_withSocketPath_withRetryPolicy() { NettyChannelBuilder mockBuilder = mock(NettyChannelBuilder.class); ManagedChannel mockChannel = mock(ManagedChannel.class); + // Input options + FlagdOptions options = FlagdOptions.builder() + .socketPath("/path/to/socket") + .keepAlive(1000) + .build(); + nettyMock .when(() -> NettyChannelBuilder.forAddress(any(DomainSocketAddress.class))) .thenReturn(mockBuilder); @@ -110,18 +116,12 @@ void testNettyChannel_withSocketPath_withRetryPolicy() { when(mockBuilder.eventLoopGroup(any(MultiThreadIoEventLoopGroup.class))) .thenReturn(mockBuilder); when(mockBuilder.channelType(EpollDomainSocketChannel.class)).thenReturn(mockBuilder); - when(mockBuilder.defaultServiceConfig(ChannelBuilder.SERVICE_CONFIG_WITH_RETRY)) + when(mockBuilder.defaultServiceConfig(ChannelBuilder.buildRetryPolicy(options))) .thenReturn(mockBuilder); doReturn(mockBuilder).when(mockBuilder).enableRetry(); when(mockBuilder.usePlaintext()).thenReturn(mockBuilder); when(mockBuilder.build()).thenReturn(mockChannel); - // Input options - FlagdOptions options = FlagdOptions.builder() - .socketPath("/path/to/socket") - .keepAlive(1000) - .build(); - // Call method under test ManagedChannel channel = ChannelBuilder.nettyChannel(options); @@ -131,7 +131,7 @@ void testNettyChannel_withSocketPath_withRetryPolicy() { verify(mockBuilder).keepAliveTime(1000, TimeUnit.MILLISECONDS); verify(mockBuilder).eventLoopGroup(any(MultiThreadIoEventLoopGroup.class)); verify(mockBuilder).channelType(EpollDomainSocketChannel.class); - verify(mockBuilder).defaultServiceConfig(ChannelBuilder.SERVICE_CONFIG_WITH_RETRY); + verify(mockBuilder).defaultServiceConfig(ChannelBuilder.buildRetryPolicy(options)); verify(mockBuilder).usePlaintext(); verify(mockBuilder).build(); } @@ -165,7 +165,7 @@ void testNettyChannel_withRetryPolicy() { // Assertions assertThat(channel).isEqualTo(mockChannel); nettyMock.verify(() -> NettyChannelBuilder.forTarget("localhost:8080")); - verify(mockBuilder).defaultServiceConfig(ChannelBuilder.SERVICE_CONFIG_WITH_RETRY); + verify(mockBuilder).defaultServiceConfig(ChannelBuilder.buildRetryPolicy(options)); verify(mockBuilder).enableRetry(); verify(mockBuilder).build(); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java index ed97f971c..a1f91d450 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java @@ -94,8 +94,7 @@ void whenShuttingDownGrpcConnectorConsumerReceivesDisconnectedEvent() throws Exc sync.countDown(); }; - ChannelConnector instance = - new ChannelConnector<>(FlagdOptions.builder().build(), testConsumer, testChannel); + ChannelConnector instance = new ChannelConnector(FlagdOptions.builder().build(), testConsumer, testChannel); instance.initialize(); // when shutting grpc connector @@ -122,8 +121,7 @@ void testMonitorChannelState(ConnectivityState state) throws Exception { Consumer testConsumer = spy(Consumer.class); - ChannelConnector instance = - new ChannelConnector<>(FlagdOptions.builder().build(), testConsumer, channel); + ChannelConnector instance = new ChannelConnector(FlagdOptions.builder().build(), testConsumer, channel); instance.initialize(); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java index 274392d5b..02098b1cb 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java @@ -16,7 +16,6 @@ import com.google.protobuf.Struct; import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector; -import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceBlockingStub; @@ -24,6 +23,8 @@ import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse; import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest; import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -32,14 +33,15 @@ import org.mockito.stubbing.Answer; class SyncStreamQueueSourceTest { - private ChannelConnector mockConnector; + private ChannelConnector mockConnector; private FlagSyncServiceBlockingStub blockingStub; private FlagSyncServiceStub stub; - private QueueingStreamObserver observer; + private FlagSyncServiceStub errorStub; + private StreamObserver observer; private CountDownLatch latch; // used to wait for observer to be initialized @BeforeEach - public void init() throws Exception { + public void setup() throws Exception { blockingStub = mock(FlagSyncServiceBlockingStub.class); when(blockingStub.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStub); when(blockingStub.getMetadata(any())).thenReturn(GetMetadataResponse.getDefaultInstance()); @@ -51,28 +53,60 @@ public void init() throws Exception { when(stub.withDeadlineAfter(anyLong(), any())).thenReturn(stub); doAnswer((Answer) invocation -> { Object[] args = invocation.getArguments(); - observer = (QueueingStreamObserver) args[1]; + observer = (StreamObserver) args[1]; latch.countDown(); return null; }) .when(stub) - .syncFlags(any(SyncFlagsRequest.class), any(QueueingStreamObserver.class)); // Mock the initialize - // method + .syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize + + errorStub = mock(FlagSyncServiceStub.class); + when(errorStub.withDeadlineAfter(anyLong(), any())).thenReturn(errorStub); + doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + observer = (StreamObserver) args[1]; + latch.countDown(); + throw new StatusRuntimeException(io.grpc.Status.NOT_FOUND); + }) + .when(errorStub) + .syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize + } + + @Test + void initError_DoesNotBusyWait() throws Exception { + // make sure we do not spin in a busy loop on errors + + int maxBackoffMs = 1000; + SyncStreamQueueSource queueSource = new SyncStreamQueueSource( + FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(), mockConnector, errorStub, blockingStub); + latch = new CountDownLatch(1); + queueSource.init(); + latch.await(); + + BlockingQueue streamQueue = queueSource.getStreamQueue(); + QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS); + assertNotNull(payload); + assertEquals(QueuePayloadType.ERROR, payload.getType()); + Thread.sleep(maxBackoffMs + (maxBackoffMs / 2)); // wait 1.5x our delay for reties + + // should have retried the stream (2 calls); initial + 1 retry + // it's very important that the retry count is low, to confirm no busy-loop + verify(errorStub, times(2)).syncFlags(any(), any()); } @Test void onNextEnqueuesDataPayload() throws Exception { - SyncStreamQueueSource connector = + SyncStreamQueueSource queueSource = new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub); latch = new CountDownLatch(1); - connector.init(); + queueSource.init(); latch.await(); // fire onNext (data) event observer.onNext(SyncFlagsResponse.newBuilder().build()); // should enqueue data payload - BlockingQueue streamQueue = connector.getStreamQueue(); + BlockingQueue streamQueue = queueSource.getStreamQueue(); QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS); assertNotNull(payload); assertNotNull(payload.getSyncContext()); @@ -84,17 +118,17 @@ void onNextEnqueuesDataPayload() throws Exception { @Test void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception { // disable GetMetadata call - SyncStreamQueueSource connector = new SyncStreamQueueSource( + SyncStreamQueueSource queueSource = new SyncStreamQueueSource( FlagdOptions.builder().syncMetadataDisabled(true).build(), mockConnector, stub, blockingStub); latch = new CountDownLatch(1); - connector.init(); + queueSource.init(); latch.await(); // fire onNext (data) event observer.onNext(SyncFlagsResponse.newBuilder().build()); // should enqueue data payload - BlockingQueue streamQueue = connector.getStreamQueue(); + BlockingQueue streamQueue = queueSource.getStreamQueue(); QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS); assertNotNull(payload); assertNull(payload.getSyncContext()); @@ -108,10 +142,10 @@ void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception { @Test void onNextEnqueuesDataPayloadWithSyncContext() throws Exception { // disable GetMetadata call - SyncStreamQueueSource connector = + SyncStreamQueueSource queueSource = new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub); latch = new CountDownLatch(1); - connector.init(); + queueSource.init(); latch.await(); // fire onNext (data) event @@ -120,7 +154,7 @@ void onNextEnqueuesDataPayloadWithSyncContext() throws Exception { SyncFlagsResponse.newBuilder().setSyncContext(syncContext).build()); // should enqueue data payload - BlockingQueue streamQueue = connector.getStreamQueue(); + BlockingQueue streamQueue = queueSource.getStreamQueue(); QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS); assertNotNull(payload); assertEquals(syncContext, payload.getSyncContext()); @@ -131,10 +165,10 @@ void onNextEnqueuesDataPayloadWithSyncContext() throws Exception { @Test void onErrorEnqueuesDataPayload() throws Exception { - SyncStreamQueueSource connector = + SyncStreamQueueSource queueSource = new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub); latch = new CountDownLatch(1); - connector.init(); + queueSource.init(); latch.await(); // fire onError event and reset latch @@ -142,7 +176,7 @@ void onErrorEnqueuesDataPayload() throws Exception { observer.onError(new Exception("fake exception")); // should enqueue error payload - BlockingQueue streamQueue = connector.getStreamQueue(); + BlockingQueue streamQueue = queueSource.getStreamQueue(); QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS); assertNotNull(payload); assertEquals(QueuePayloadType.ERROR, payload.getType()); @@ -153,10 +187,10 @@ void onErrorEnqueuesDataPayload() throws Exception { @Test void onCompletedEnqueuesDataPayload() throws Exception { - SyncStreamQueueSource connector = + SyncStreamQueueSource queueSource = new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub); latch = new CountDownLatch(1); - connector.init(); + queueSource.init(); latch.await(); // fire onCompleted event (graceful stream end) and reset latch @@ -164,7 +198,7 @@ void onCompletedEnqueuesDataPayload() throws Exception { observer.onCompleted(); // should enqueue error payload - BlockingQueue streamQueue = connector.getStreamQueue(); + BlockingQueue streamQueue = queueSource.getStreamQueue(); assertTrue(streamQueue.isEmpty()); // should have restarted the stream (2 calls) latch.await(); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java index eb9a55c13..4b7acb569 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java @@ -27,7 +27,7 @@ import org.mockito.stubbing.Answer; class RpcResolverTest { - private ChannelConnector mockConnector; + private ChannelConnector mockConnector; private ServiceBlockingStub blockingStub; private ServiceStub stub; private QueueingStreamObserver observer;