Skip to content

Commit 60b7e0f

Browse files
committed
fixup: add sleep delay to prevent busy loop
Signed-off-by: Todd Baert <[email protected]>
1 parent 1eccc02 commit 60b7e0f

File tree

6 files changed

+145
-131
lines changed

6 files changed

+145
-131
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public final class Config {
1414
static final String DEFAULT_HOST = "localhost";
1515

1616
static final int DEFAULT_DEADLINE = 500;
17+
static final int DEFAULT_MAX_RETRY_BACKOFF = 5;
1718
static final int DEFAULT_STREAM_DEADLINE_MS = 10 * 60 * 1000;
1819
static final int DEFAULT_STREAM_RETRY_GRACE_PERIOD = 5;
1920
static final int DEFAULT_MAX_CACHE_SIZE = 1000;
@@ -32,6 +33,7 @@ public final class Config {
3233
static final String MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME = "FLAGD_MAX_EVENT_STREAM_RETRIES";
3334
static final String BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME = "FLAGD_RETRY_BACKOFF_MS";
3435
static final String DEADLINE_MS_ENV_VAR_NAME = "FLAGD_DEADLINE_MS";
36+
static final String FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME = "FLAGD_RETRY_BACKOFF_MAX_MS";
3537
static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS";
3638
static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR";
3739
static final String SOURCE_PROVIDER_ID_ENV_VAR_NAME = "FLAGD_SOURCE_PROVIDER_ID";

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,18 @@ public class FlagdOptions {
9797
@Builder.Default
9898
private int deadline = fallBackToEnvOrDefault(Config.DEADLINE_MS_ENV_VAR_NAME, Config.DEFAULT_DEADLINE);
9999

100+
/**
101+
* Max stream retry backoff in milliseconds.
102+
*/
103+
@Builder.Default
104+
private int retryBackoffMaxMs =
105+
fallBackToEnvOrDefault(Config.FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME, Config.DEFAULT_MAX_RETRY_BACKOFF);
106+
100107
/**
101108
* Streaming connection deadline in milliseconds.
102109
* Set to 0 to disable the deadline.
103-
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from killing idle connections.
110+
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from
111+
* killing idle connections.
104112
*/
105113
@Builder.Default
106114
private int streamDeadlineMs =

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java

Lines changed: 47 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -25,94 +25,55 @@
2525

2626
/** gRPC channel builder helper. */
2727
public class ChannelBuilder {
28-
29-
@SuppressWarnings({"unchecked", "rawtypes"})
30-
private static final Map<String, ?> DEFAULT_RETRY_POLICY = new HashMap() {
31-
{
32-
// 1s + 2s + 4s
33-
put("maxAttempts", 3.0); // types used here are important, need to be doubles
34-
put("initialBackoff", "1s");
35-
put("maxBackoff", "5s");
36-
put("backoffMultiplier", 2.0);
37-
// status codes to retry on:
38-
put(
39-
"retryableStatusCodes",
40-
Arrays.asList(
41-
/*
42-
* Only states UNAVAILABLE and UNKNOWN should be retried. All
43-
* other failure states will probably not be resolved with a simple retry.
44-
*/
45-
Code.UNAVAILABLE.toString(), Code.UNKNOWN.toString()));
46-
}
47-
};
48-
4928
/**
5029
* Controls retry (not-reconnection) policy for failed RPCs.
5130
*/
5231
@SuppressWarnings({"unchecked", "rawtypes"})
53-
static final Map<String, ?> SERVICE_CONFIG_WITH_RETRY = new HashMap() {
54-
{
55-
put("methodConfig", Arrays.asList(new HashMap() {
56-
{
57-
put(
58-
"name",
59-
Arrays.asList(
60-
new HashMap() {
61-
{
62-
put("service", "flagd.sync.v1.FlagSyncService");
63-
}
64-
},
65-
new HashMap() {
66-
{
67-
put("service", "flagd.evaluation.v1.Service");
68-
}
69-
}));
70-
put("retryPolicy", DEFAULT_RETRY_POLICY);
71-
}
72-
73-
{
74-
put("name", Arrays.asList(new HashMap() {
75-
{
76-
put("service", "flagd.sync.v1.FlagSyncService");
77-
put("method", "SyncFlags");
78-
}
79-
}));
80-
put("retryPolicy", new HashMap(DEFAULT_RETRY_POLICY) {
81-
{
82-
// 1s + 2s + 4s + 5s + 5s + 5s + 5s + 5s + 5s + 5s
83-
put("maxAttempts", 12.0);
84-
// for streaming Retry on more status codes
85-
put(
86-
"retryableStatusCodes",
87-
Arrays.asList(
88-
/*
89-
* All codes are retryable except OK and DEADLINE_EXCEEDED since
90-
* any others not listed here cause a very tight loop of retries.
91-
* DEADLINE_EXCEEDED is typically a result of a client specified
92-
* deadline,and definitionally should not result in a tight loop
93-
* (it's a timeout).
94-
*/
95-
Code.CANCELLED.toString(),
96-
Code.UNKNOWN.toString(),
97-
Code.INVALID_ARGUMENT.toString(),
98-
Code.NOT_FOUND.toString(),
99-
Code.ALREADY_EXISTS.toString(),
100-
Code.PERMISSION_DENIED.toString(),
101-
Code.RESOURCE_EXHAUSTED.toString(),
102-
Code.FAILED_PRECONDITION.toString(),
103-
Code.ABORTED.toString(),
104-
Code.OUT_OF_RANGE.toString(),
105-
Code.UNIMPLEMENTED.toString(),
106-
Code.INTERNAL.toString(),
107-
Code.UNAVAILABLE.toString(),
108-
Code.DATA_LOSS.toString(),
109-
Code.UNAUTHENTICATED.toString()));
110-
}
111-
});
112-
}
113-
}));
114-
}
115-
};
32+
static Map<String, ?> buildRetryPolicy(FlagdOptions options) {
33+
return new HashMap() {
34+
{
35+
put("methodConfig", Arrays.asList(new HashMap() {
36+
{
37+
put(
38+
"name",
39+
Arrays.asList(
40+
new HashMap() {
41+
{
42+
put("service", "flagd.sync.v1.FlagSyncService");
43+
}
44+
},
45+
new HashMap() {
46+
{
47+
put("service", "flagd.evaluation.v1.Service");
48+
}
49+
}));
50+
put("retryPolicy", new HashMap() {
51+
{
52+
// 1 + 2 + 4
53+
put("maxAttempts", 3.0); // types used here are important, need to be doubles
54+
put("initialBackoff", "1s");
55+
put(
56+
"maxBackoff",
57+
options.getRetryBackoffMaxMs() >= 1000
58+
? String.format("%ds", options.getRetryBackoffMaxMs() / 1000)
59+
: "1s");
60+
put("backoffMultiplier", 2.0);
61+
// status codes to retry on:
62+
put(
63+
"retryableStatusCodes",
64+
Arrays.asList(
65+
/*
66+
* As per gRPC spec, the following status codes are safe to retry:
67+
* UNAVAILABLE, UNKNOWN,
68+
*/
69+
Code.UNKNOWN.toString(), Code.UNAVAILABLE.toString()));
70+
}
71+
});
72+
}
73+
}));
74+
}
75+
};
76+
}
11677

11778
private ChannelBuilder() {}
11879

@@ -137,7 +98,7 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
13798
.eventLoopGroup(new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory()))
13899
.channelType(EpollDomainSocketChannel.class)
139100
.usePlaintext()
140-
.defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY)
101+
.defaultServiceConfig(buildRetryPolicy(options))
141102
.enableRetry()
142103
.build();
143104
}
@@ -183,7 +144,7 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
183144
builder.intercept(new FlagdGrpcInterceptor(options.getOpenTelemetry()));
184145
}
185146

186-
return builder.defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY)
147+
return builder.defaultServiceConfig(buildRetryPolicy(options))
187148
.enableRetry()
188149
.build();
189150
} catch (SSLException ssle) {

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class SyncStreamQueueSource implements QueueSource {
4040
private final AtomicBoolean shutdown = new AtomicBoolean(false);
4141
private final int streamDeadline;
4242
private final int deadline;
43+
private final int maxBackoffMs;
4344
private final String selector;
4445
private final String providerId;
4546
private final boolean syncMetadataDisabled;
@@ -56,6 +57,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
5657
deadline = options.getDeadline();
5758
selector = options.getSelector();
5859
providerId = options.getProviderId();
60+
maxBackoffMs = options.getRetryBackoffMaxMs();
5961
syncMetadataDisabled = options.isSyncMetadataDisabled();
6062
channelConnector = new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
6163
flagSyncStub =
@@ -75,6 +77,7 @@ protected SyncStreamQueueSource(
7577
selector = options.getSelector();
7678
providerId = options.getProviderId();
7779
channelConnector = connectorMock;
80+
maxBackoffMs = options.getRetryBackoffMaxMs();
7881
flagSyncStub = stubMock;
7982
syncMetadataDisabled = options.isSyncMetadataDisabled();
8083
metadataStub = blockingStubMock;
@@ -110,26 +113,32 @@ private void observeSyncStream() {
110113
log.info("Initializing sync stream observer");
111114

112115
// outer loop for re-issuing the stream request
113-
// "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions
116+
// "waitForReady" on the channel, plus our retry policy slow this loop down in
117+
// error conditions
114118
while (!shutdown.get()) {
115-
log.debug("Initializing sync stream request");
116-
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue);
117-
118-
try {
119-
observer.metadata = getMetadata();
120-
} catch (Exception metaEx) {
121-
// retry if getMetadata fails
122-
String message = metaEx.getMessage();
123-
log.debug("Metadata request error: {}, will restart", message, metaEx);
124-
enqueueError(String.format("Error in getMetadata request: %s", message));
125-
continue;
126-
}
127-
128119
try {
129-
syncFlags(observer);
130-
} catch (Exception ex) {
131-
log.error("Unexpected sync stream exception, will restart.", ex);
132-
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
120+
log.debug("Initializing sync stream request");
121+
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue);
122+
try {
123+
observer.metadata = getMetadata();
124+
} catch (Exception metaEx) {
125+
// retry if getMetadata fails
126+
String message = metaEx.getMessage();
127+
log.debug("Metadata request error: {}, will restart", message, metaEx);
128+
enqueueError(String.format("Error in getMetadata request: %s", message));
129+
Thread.sleep(this.maxBackoffMs);
130+
continue;
131+
}
132+
133+
try {
134+
syncFlags(observer);
135+
} catch (Exception ex) {
136+
log.error("Unexpected sync stream exception, will restart.", ex);
137+
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
138+
Thread.sleep(this.maxBackoffMs);
139+
}
140+
} catch (InterruptedException ie) {
141+
log.debug("Stream loop interrupted, most likely shutdown was invoked", ie);
133142
}
134143
}
135144

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilderTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,12 @@ void testNettyChannel_withSocketPath_withRetryPolicy() {
102102
NettyChannelBuilder mockBuilder = mock(NettyChannelBuilder.class);
103103
ManagedChannel mockChannel = mock(ManagedChannel.class);
104104

105+
// Input options
106+
FlagdOptions options = FlagdOptions.builder()
107+
.socketPath("/path/to/socket")
108+
.keepAlive(1000)
109+
.build();
110+
105111
nettyMock
106112
.when(() -> NettyChannelBuilder.forAddress(any(DomainSocketAddress.class)))
107113
.thenReturn(mockBuilder);
@@ -110,18 +116,12 @@ void testNettyChannel_withSocketPath_withRetryPolicy() {
110116
when(mockBuilder.eventLoopGroup(any(MultiThreadIoEventLoopGroup.class)))
111117
.thenReturn(mockBuilder);
112118
when(mockBuilder.channelType(EpollDomainSocketChannel.class)).thenReturn(mockBuilder);
113-
when(mockBuilder.defaultServiceConfig(ChannelBuilder.SERVICE_CONFIG_WITH_RETRY))
119+
when(mockBuilder.defaultServiceConfig(ChannelBuilder.buildRetryPolicy(options)))
114120
.thenReturn(mockBuilder);
115121
doReturn(mockBuilder).when(mockBuilder).enableRetry();
116122
when(mockBuilder.usePlaintext()).thenReturn(mockBuilder);
117123
when(mockBuilder.build()).thenReturn(mockChannel);
118124

119-
// Input options
120-
FlagdOptions options = FlagdOptions.builder()
121-
.socketPath("/path/to/socket")
122-
.keepAlive(1000)
123-
.build();
124-
125125
// Call method under test
126126
ManagedChannel channel = ChannelBuilder.nettyChannel(options);
127127

@@ -131,7 +131,7 @@ void testNettyChannel_withSocketPath_withRetryPolicy() {
131131
verify(mockBuilder).keepAliveTime(1000, TimeUnit.MILLISECONDS);
132132
verify(mockBuilder).eventLoopGroup(any(MultiThreadIoEventLoopGroup.class));
133133
verify(mockBuilder).channelType(EpollDomainSocketChannel.class);
134-
verify(mockBuilder).defaultServiceConfig(ChannelBuilder.SERVICE_CONFIG_WITH_RETRY);
134+
verify(mockBuilder).defaultServiceConfig(ChannelBuilder.buildRetryPolicy(options));
135135
verify(mockBuilder).usePlaintext();
136136
verify(mockBuilder).build();
137137
}
@@ -165,7 +165,7 @@ void testNettyChannel_withRetryPolicy() {
165165
// Assertions
166166
assertThat(channel).isEqualTo(mockChannel);
167167
nettyMock.verify(() -> NettyChannelBuilder.forTarget("localhost:8080"));
168-
verify(mockBuilder).defaultServiceConfig(ChannelBuilder.SERVICE_CONFIG_WITH_RETRY);
168+
verify(mockBuilder).defaultServiceConfig(ChannelBuilder.buildRetryPolicy(options));
169169
verify(mockBuilder).enableRetry();
170170
verify(mockBuilder).build();
171171
}

0 commit comments

Comments
 (0)