Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
893cd1c
fix(flagd): Remove one level of queueing in SyncStreamQueueSource and…
guidobrei Sep 16, 2025
67b190d
fix(flagd): Use default instance of GetMetadataRequest
guidobrei Sep 17, 2025
5b47a15
fix(flagd): Keep error reporting equivalent to previous implementation
guidobrei Sep 17, 2025
e972a80
fix(flagd): Fix checkstzle issues
guidobrei Sep 17, 2025
7991058
fix(flagd): Fail early in syncFlags
guidobrei Sep 17, 2025
f4a93d6
fix(flagd): Add thread interruption
guidobrei Sep 17, 2025
774bdee
fix(flagd): Fix Spotless complaints
guidobrei Sep 18, 2025
acab1f3
fix(flagd): Get rid of gRPC context cancellation
guidobrei Sep 19, 2025
3ad54de
fix(flagd): Offer error if getMetadata fails
guidobrei Sep 19, 2025
fc963ce
fixup: spotless
toddbaert Sep 23, 2025
c7c1a3a
Fix error message
guidobrei Sep 23, 2025
0c41114
fix(flagd): Ignore unimplemented getMetadata
guidobrei Sep 23, 2025
0c29e37
fix(flagd): Update retryable status codes in retry logic
guidobrei Sep 24, 2025
8baba19
fix(flagd): Simplified ChannelConnector
guidobrei Sep 24, 2025
ddffc09
fix(flagd): Extended retry logic for SyncFlags
guidobrei Sep 24, 2025
1920779
fix(flagd): Properly handle metadata unimplemented status code
guidobrei Oct 1, 2025
86de034
fix(flagd): Remove unused imports
guidobrei Oct 1, 2025
9a2ef7b
fix(flagd): Please spotless
guidobrei Oct 3, 2025
95914e6
fixup: add sleep delay to prevent busy loop
toddbaert Oct 13, 2025
d0286a9
Apply suggestion from @toddbaert
toddbaert Oct 14, 2025
455b362
fixup: default
toddbaert Oct 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public final class Config {
static final String DEFAULT_HOST = "localhost";

static final int DEFAULT_DEADLINE = 500;
static final int DEFAULT_MAX_RETRY_BACKOFF = 5;
static final int DEFAULT_STREAM_DEADLINE_MS = 10 * 60 * 1000;
static final int DEFAULT_STREAM_RETRY_GRACE_PERIOD = 5;
static final int DEFAULT_MAX_CACHE_SIZE = 1000;
Expand All @@ -31,6 +32,7 @@ public final class Config {
static final String MAX_CACHE_SIZE_ENV_VAR_NAME = "FLAGD_MAX_CACHE_SIZE";
static final String BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME = "FLAGD_RETRY_BACKOFF_MS";
static final String DEADLINE_MS_ENV_VAR_NAME = "FLAGD_DEADLINE_MS";
static final String FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME = "FLAGD_RETRY_BACKOFF_MAX_MS";
static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS";
static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR";
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,18 @@ public class FlagdOptions {
@Builder.Default
private int deadline = fallBackToEnvOrDefault(Config.DEADLINE_MS_ENV_VAR_NAME, Config.DEFAULT_DEADLINE);

/**
* Max stream retry backoff in milliseconds.
*/
@Builder.Default
private int retryBackoffMaxMs =
fallBackToEnvOrDefault(Config.FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME, Config.DEFAULT_MAX_RETRY_BACKOFF);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already a defined param we never implemented, so good to have now.

We use this also for the sleep time for non-transient errors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're mixing here ms and s.

Is static final int DEFAULT_MAX_RETRY_BACKOFF = 5; a time unit or count?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a time. I will make the default name more clear. It should also be 12000 as per https://flagd.dev/providers/rust/?h=flagd_retry_backoff_max_ms#configuration-options... nice catch.... this was leftover from my initial version before I realized we had an unimplemented var for this.


/**
* Streaming connection deadline in milliseconds.
* Set to 0 to disable the deadline.
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from killing idle connections.
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from
* killing idle connections.
*/
@Builder.Default
private int streamDeadlineMs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,95 +24,57 @@
import javax.net.ssl.SSLException;

/** gRPC channel builder helper. */
@SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "we don't care to serialize this")
public class ChannelBuilder {

@SuppressWarnings({"unchecked", "rawtypes"})
private static final Map<String, ?> DEFAULT_RETRY_POLICY = new HashMap() {
{
// 1s + 2s + 4s
put("maxAttempts", 3.0); // types used here are important, need to be doubles
put("initialBackoff", "1s");
put("maxBackoff", "5s");
put("backoffMultiplier", 2.0);
// status codes to retry on:
put(
"retryableStatusCodes",
Arrays.asList(
/*
* Only states UNAVAILABLE and UNKNOWN should be retried. All
* other failure states will probably not be resolved with a simple retry.
*/
Code.UNAVAILABLE.toString(), Code.UNKNOWN.toString()));
}
};

/**
* Controls retry (not-reconnection) policy for failed RPCs.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
static final Map<String, ?> SERVICE_CONFIG_WITH_RETRY = new HashMap() {
{
put("methodConfig", Arrays.asList(new HashMap() {
{
put(
"name",
Arrays.asList(
new HashMap() {
{
put("service", "flagd.sync.v1.FlagSyncService");
}
},
new HashMap() {
{
put("service", "flagd.evaluation.v1.Service");
}
}));
put("retryPolicy", DEFAULT_RETRY_POLICY);
}

{
put("name", Arrays.asList(new HashMap() {
{
put("service", "flagd.sync.v1.FlagSyncService");
put("method", "SyncFlags");
}
}));
put("retryPolicy", new HashMap(DEFAULT_RETRY_POLICY) {
{
// 1s + 2s + 4s + 5s + 5s + 5s + 5s + 5s + 5s + 5s
put("maxAttempts", 12.0);
// for streaming Retry on more status codes
put(
"retryableStatusCodes",
Arrays.asList(
/*
* All codes are retryable except OK and DEADLINE_EXCEEDED since
* any others not listed here cause a very tight loop of retries.
* DEADLINE_EXCEEDED is typically a result of a client specified
* deadline,and definitionally should not result in a tight loop
* (it's a timeout).
*/
Code.CANCELLED.toString(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed - these were only here to slow down our loop, which we do with a sleep now.

Code.UNKNOWN.toString(),
Code.INVALID_ARGUMENT.toString(),
Code.NOT_FOUND.toString(),
Code.ALREADY_EXISTS.toString(),
Code.PERMISSION_DENIED.toString(),
Code.RESOURCE_EXHAUSTED.toString(),
Code.FAILED_PRECONDITION.toString(),
Code.ABORTED.toString(),
Code.OUT_OF_RANGE.toString(),
Code.UNIMPLEMENTED.toString(),
Code.INTERNAL.toString(),
Code.UNAVAILABLE.toString(),
Code.DATA_LOSS.toString(),
Code.UNAUTHENTICATED.toString()));
}
});
}
}));
}
};
static Map<String, ?> buildRetryPolicy(final FlagdOptions options) {
return new HashMap() {
{
put("methodConfig", Arrays.asList(new HashMap() {
{
put(
"name",
Arrays.asList(
new HashMap() {
{
put("service", "flagd.sync.v1.FlagSyncService");
}
},
new HashMap() {
{
put("service", "flagd.evaluation.v1.Service");
}
}));
put("retryPolicy", new HashMap() {
{
// 1 + 2 + 4
put("maxAttempts", 3.0); // types used here are important, need to be doubles
put("initialBackoff", "1s");
put(
"maxBackoff",
options.getRetryBackoffMaxMs() >= 1000
? String.format("%ds", options.getRetryBackoffMaxMs() / 1000)
: "1s");
put("backoffMultiplier", 2.0);
// status codes to retry on:
put(
"retryableStatusCodes",
Arrays.asList(
/*
* As per gRPC spec, the following status codes are safe to retry:
* UNAVAILABLE, UNKNOWN,
*/
Code.UNKNOWN.toString(), Code.UNAVAILABLE.toString()));
}
});
}
}));
}
};
}

private ChannelBuilder() {}

Expand All @@ -137,7 +99,7 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
.eventLoopGroup(new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory()))
.channelType(EpollDomainSocketChannel.class)
.usePlaintext()
.defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY)
.defaultServiceConfig(buildRetryPolicy(options))
.enableRetry()
.build();
}
Expand Down Expand Up @@ -183,7 +145,7 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
builder.intercept(new FlagdGrpcInterceptor(options.getOpenTelemetry()));
}

return builder.defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY)
return builder.defaultServiceConfig(buildRetryPolicy(options))
.enableRetry()
.build();
} catch (SSLException ssle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class SyncStreamQueueSource implements QueueSource {
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final int streamDeadline;
private final int deadline;
private final int maxBackoffMs;
private final String selector;
private final String providerId;
private final boolean syncMetadataDisabled;
Expand All @@ -56,6 +57,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
deadline = options.getDeadline();
selector = options.getSelector();
providerId = options.getProviderId();
maxBackoffMs = options.getRetryBackoffMaxMs();
syncMetadataDisabled = options.isSyncMetadataDisabled();
channelConnector = new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
flagSyncStub =
Expand All @@ -75,6 +77,7 @@ protected SyncStreamQueueSource(
selector = options.getSelector();
providerId = options.getProviderId();
channelConnector = connectorMock;
maxBackoffMs = options.getRetryBackoffMaxMs();
flagSyncStub = stubMock;
syncMetadataDisabled = options.isSyncMetadataDisabled();
metadataStub = blockingStubMock;
Expand Down Expand Up @@ -110,26 +113,32 @@ private void observeSyncStream() {
log.info("Initializing sync stream observer");

// outer loop for re-issuing the stream request
// "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions
// "waitForReady" on the channel, plus our retry policy slow this loop down in
// error conditions
while (!shutdown.get()) {
log.debug("Initializing sync stream request");
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue);

try {
observer.metadata = getMetadata();
} catch (Exception metaEx) {
// retry if getMetadata fails
String message = metaEx.getMessage();
log.debug("Metadata request error: {}, will restart", message, metaEx);
enqueueError(String.format("Error in getMetadata request: %s", message));
continue;
}

try {
syncFlags(observer);
} catch (Exception ex) {
log.error("Unexpected sync stream exception, will restart.", ex);
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
log.debug("Initializing sync stream request");
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue);
try {
observer.metadata = getMetadata();
} catch (Exception metaEx) {
// retry if getMetadata fails
String message = metaEx.getMessage();
log.debug("Metadata request error: {}, will restart", message, metaEx);
enqueueError(String.format("Error in getMetadata request: %s", message));
Thread.sleep(this.maxBackoffMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this sleep helps prevent tight loops during retries, which can be a problem if a upstream connection in a proxy is down.

continue;
}

try {
syncFlags(observer);
} catch (Exception ex) {
log.error("Unexpected sync stream exception, will restart.", ex);
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
Thread.sleep(this.maxBackoffMs);
}
} catch (InterruptedException ie) {
log.debug("Stream loop interrupted, most likely shutdown was invoked", ie);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ void testNettyChannel_withSocketPath_withRetryPolicy() {
NettyChannelBuilder mockBuilder = mock(NettyChannelBuilder.class);
ManagedChannel mockChannel = mock(ManagedChannel.class);

// Input options
FlagdOptions options = FlagdOptions.builder()
.socketPath("/path/to/socket")
.keepAlive(1000)
.build();

nettyMock
.when(() -> NettyChannelBuilder.forAddress(any(DomainSocketAddress.class)))
.thenReturn(mockBuilder);
Expand All @@ -110,18 +116,12 @@ void testNettyChannel_withSocketPath_withRetryPolicy() {
when(mockBuilder.eventLoopGroup(any(MultiThreadIoEventLoopGroup.class)))
.thenReturn(mockBuilder);
when(mockBuilder.channelType(EpollDomainSocketChannel.class)).thenReturn(mockBuilder);
when(mockBuilder.defaultServiceConfig(ChannelBuilder.SERVICE_CONFIG_WITH_RETRY))
when(mockBuilder.defaultServiceConfig(ChannelBuilder.buildRetryPolicy(options)))
.thenReturn(mockBuilder);
doReturn(mockBuilder).when(mockBuilder).enableRetry();
when(mockBuilder.usePlaintext()).thenReturn(mockBuilder);
when(mockBuilder.build()).thenReturn(mockChannel);

// Input options
FlagdOptions options = FlagdOptions.builder()
.socketPath("/path/to/socket")
.keepAlive(1000)
.build();

// Call method under test
ManagedChannel channel = ChannelBuilder.nettyChannel(options);

Expand All @@ -131,7 +131,7 @@ void testNettyChannel_withSocketPath_withRetryPolicy() {
verify(mockBuilder).keepAliveTime(1000, TimeUnit.MILLISECONDS);
verify(mockBuilder).eventLoopGroup(any(MultiThreadIoEventLoopGroup.class));
verify(mockBuilder).channelType(EpollDomainSocketChannel.class);
verify(mockBuilder).defaultServiceConfig(ChannelBuilder.SERVICE_CONFIG_WITH_RETRY);
verify(mockBuilder).defaultServiceConfig(ChannelBuilder.buildRetryPolicy(options));
verify(mockBuilder).usePlaintext();
verify(mockBuilder).build();
}
Expand Down Expand Up @@ -165,7 +165,7 @@ void testNettyChannel_withRetryPolicy() {
// Assertions
assertThat(channel).isEqualTo(mockChannel);
nettyMock.verify(() -> NettyChannelBuilder.forTarget("localhost:8080"));
verify(mockBuilder).defaultServiceConfig(ChannelBuilder.SERVICE_CONFIG_WITH_RETRY);
verify(mockBuilder).defaultServiceConfig(ChannelBuilder.buildRetryPolicy(options));
verify(mockBuilder).enableRetry();
verify(mockBuilder).build();
}
Expand Down
Loading
Loading