Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
893cd1c
fix(flagd): Remove one level of queueing in SyncStreamQueueSource and…
guidobrei Sep 16, 2025
67b190d
fix(flagd): Use default instance of GetMetadataRequest
guidobrei Sep 17, 2025
5b47a15
fix(flagd): Keep error reporting equivalent to previous implementation
guidobrei Sep 17, 2025
e972a80
fix(flagd): Fix checkstzle issues
guidobrei Sep 17, 2025
7991058
fix(flagd): Fail early in syncFlags
guidobrei Sep 17, 2025
f4a93d6
fix(flagd): Add thread interruption
guidobrei Sep 17, 2025
774bdee
fix(flagd): Fix Spotless complaints
guidobrei Sep 18, 2025
acab1f3
fix(flagd): Get rid of gRPC context cancellation
guidobrei Sep 19, 2025
3ad54de
fix(flagd): Offer error if getMetadata fails
guidobrei Sep 19, 2025
fc963ce
fixup: spotless
toddbaert Sep 23, 2025
c7c1a3a
Fix error message
guidobrei Sep 23, 2025
0c41114
fix(flagd): Ignore unimplemented getMetadata
guidobrei Sep 23, 2025
0c29e37
fix(flagd): Update retryable status codes in retry logic
guidobrei Sep 24, 2025
8baba19
fix(flagd): Simplified ChannelConnector
guidobrei Sep 24, 2025
ddffc09
fix(flagd): Extended retry logic for SyncFlags
guidobrei Sep 24, 2025
1920779
fix(flagd): Properly handle metadata unimplemented status code
guidobrei Oct 1, 2025
86de034
fix(flagd): Remove unused imports
guidobrei Oct 1, 2025
9a2ef7b
fix(flagd): Please spotless
guidobrei Oct 3, 2025
95914e6
fixup: add sleep delay to prevent busy loop
toddbaert Oct 13, 2025
d0286a9
Apply suggestion from @toddbaert
toddbaert Oct 14, 2025
455b362
fixup: default
toddbaert Oct 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public final class Config {
static final String DEFAULT_HOST = "localhost";

static final int DEFAULT_DEADLINE = 500;
static final int DEFAULT_MAX_RETRY_BACKOFF = 5;
static final int DEFAULT_STREAM_DEADLINE_MS = 10 * 60 * 1000;
static final int DEFAULT_STREAM_RETRY_GRACE_PERIOD = 5;
static final int DEFAULT_MAX_CACHE_SIZE = 1000;
Expand All @@ -31,6 +32,7 @@ public final class Config {
static final String MAX_CACHE_SIZE_ENV_VAR_NAME = "FLAGD_MAX_CACHE_SIZE";
static final String BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME = "FLAGD_RETRY_BACKOFF_MS";
static final String DEADLINE_MS_ENV_VAR_NAME = "FLAGD_DEADLINE_MS";
static final String FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME = "FLAGD_RETRY_BACKOFF_MAX_MS";
static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS";
static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR";
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,18 @@ public class FlagdOptions {
@Builder.Default
private int deadline = fallBackToEnvOrDefault(Config.DEADLINE_MS_ENV_VAR_NAME, Config.DEFAULT_DEADLINE);

/**
* Max stream retry backoff in milliseconds.
*/
@Builder.Default
private int retryBackoffMaxMs =
fallBackToEnvOrDefault(Config.FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME, Config.DEFAULT_MAX_RETRY_BACKOFF);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already a defined param we never implemented, so good to have now.

We use this also for the sleep time for non-transient errors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're mixing here ms and s.

Is static final int DEFAULT_MAX_RETRY_BACKOFF = 5; a time unit or count?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a time. I will make the default name more clear. It should also be 12000 as per https://flagd.dev/providers/rust/?h=flagd_retry_backoff_max_ms#configuration-options... nice catch.... this was leftover from my initial version before I realized we had an unimplemented var for this.


/**
* Streaming connection deadline in milliseconds.
* Set to 0 to disable the deadline.
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from killing idle connections.
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from
* killing idle connections.
*/
@Builder.Default
private int streamDeadlineMs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,67 +24,57 @@
import javax.net.ssl.SSLException;

/** gRPC channel builder helper. */
@SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "we don't care to serialize this")
public class ChannelBuilder {

/**
* Controls retry (not-reconnection) policy for failed RPCs.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
static final Map<String, ?> SERVICE_CONFIG_WITH_RETRY = new HashMap() {
{
put("methodConfig", Arrays.asList(new HashMap() {
{
put(
"name",
Arrays.asList(
new HashMap() {
{
put("service", "flagd.sync.v1.FlagSyncService");
}
},
new HashMap() {
{
put("service", "flagd.evaluation.v1.Service");
}
}));
put("retryPolicy", new HashMap() {
{
// 1 + 2 + 4
put("maxAttempts", 3.0); // types used here are important, need to be doubles
put("initialBackoff", "1s");
put("maxBackoff", "5s");
put("backoffMultiplier", 2.0);
// status codes to retry on:
put(
"retryableStatusCodes",
Arrays.asList(
/*
* All codes are retryable except OK and DEADLINE_EXCEEDED since
* any others not listed here cause a very tight loop of retries.
* DEADLINE_EXCEEDED is typically a result of a client specified deadline,
* and definitionally should not result in a tight loop (it's a timeout).
*/
Code.CANCELLED.toString(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed - these were only here to slow down our loop, which we do with a sleep now.

Code.UNKNOWN.toString(),
Code.INVALID_ARGUMENT.toString(),
Code.NOT_FOUND.toString(),
Code.ALREADY_EXISTS.toString(),
Code.PERMISSION_DENIED.toString(),
Code.RESOURCE_EXHAUSTED.toString(),
Code.FAILED_PRECONDITION.toString(),
Code.ABORTED.toString(),
Code.OUT_OF_RANGE.toString(),
Code.UNIMPLEMENTED.toString(),
Code.INTERNAL.toString(),
Code.UNAVAILABLE.toString(),
Code.DATA_LOSS.toString(),
Code.UNAUTHENTICATED.toString()));
}
});
}
}));
}
};
static Map<String, ?> buildRetryPolicy(final FlagdOptions options) {
return new HashMap() {
{
put("methodConfig", Arrays.asList(new HashMap() {
{
put(
"name",
Arrays.asList(
new HashMap() {
{
put("service", "flagd.sync.v1.FlagSyncService");
}
},
new HashMap() {
{
put("service", "flagd.evaluation.v1.Service");
}
}));
put("retryPolicy", new HashMap() {
{
// 1 + 2 + 4
put("maxAttempts", 3.0); // types used here are important, need to be doubles
put("initialBackoff", "1s");
put(
"maxBackoff",
options.getRetryBackoffMaxMs() >= 1000
? String.format("%ds", options.getRetryBackoffMaxMs() / 1000)
: "1s");
put("backoffMultiplier", 2.0);
// status codes to retry on:
put(
"retryableStatusCodes",
Arrays.asList(
/*
* As per gRPC spec, the following status codes are safe to retry:
* UNAVAILABLE, UNKNOWN,
*/
Code.UNKNOWN.toString(), Code.UNAVAILABLE.toString()));
}
});
}
}));
}
};
}

private ChannelBuilder() {}

Expand All @@ -109,7 +99,7 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
.eventLoopGroup(new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory()))
.channelType(EpollDomainSocketChannel.class)
.usePlaintext()
.defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY)
.defaultServiceConfig(buildRetryPolicy(options))
.enableRetry()
.build();
}
Expand Down Expand Up @@ -155,7 +145,7 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
builder.intercept(new FlagdGrpcInterceptor(options.getOpenTelemetry()));
}

return builder.defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY)
return builder.defaultServiceConfig(buildRetryPolicy(options))
.enableRetry()
.build();
} catch (SSLException ssle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import dev.openfeature.sdk.ProviderEvent;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.stub.AbstractBlockingStub;
import io.grpc.stub.AbstractStub;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -16,12 +14,9 @@
/**
* A generic GRPC connector that manages connection states, reconnection logic, and event streaming for
* GRPC services.
*
* @param <T> the type of the asynchronous stub for the GRPC service
* @param <K> the type of the blocking stub for the GRPC service
*/
@Slf4j
public class ChannelConnector<T extends AbstractStub<T>, K extends AbstractBlockingStub<K>> {
public class ChannelConnector {

/**
* The GRPC managed channel for managing the underlying GRPC connection.
Expand All @@ -48,22 +43,11 @@ public class ChannelConnector<T extends AbstractStub<T>, K extends AbstractBlock
*/
public ChannelConnector(
final FlagdOptions options, final Consumer<FlagdProviderEvent> onConnectionEvent, ManagedChannel channel) {

this.channel = channel;
this.deadline = options.getDeadline();
this.onConnectionEvent = onConnectionEvent;
}

/**
* Constructs a {@code ChannelConnector} instance for testing purposes.
*
* @param options the configuration options for the GRPC connection
* @param onConnectionEvent a consumer to handle connection events
*/
public ChannelConnector(final FlagdOptions options, final Consumer<FlagdProviderEvent> onConnectionEvent) {
this(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
}

/**
* Initializes the GRPC connection by waiting for the channel to be ready and monitoring its state.
*
Expand Down
Loading
Loading