Skip to content

Commit b4c62f7

Browse files
committed
fixup: add sleep delay to prevent busy loop
Signed-off-by: Todd Baert <[email protected]>
1 parent 1eccc02 commit b4c62f7

File tree

5 files changed

+109
-89
lines changed

5 files changed

+109
-89
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public final class Config {
1414
static final String DEFAULT_HOST = "localhost";
1515

1616
static final int DEFAULT_DEADLINE = 500;
17+
static final int DEFAULT_MAX_RETRY_BACKOFF = 5;
1718
static final int DEFAULT_STREAM_DEADLINE_MS = 10 * 60 * 1000;
1819
static final int DEFAULT_STREAM_RETRY_GRACE_PERIOD = 5;
1920
static final int DEFAULT_MAX_CACHE_SIZE = 1000;
@@ -32,6 +33,7 @@ public final class Config {
3233
static final String MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME = "FLAGD_MAX_EVENT_STREAM_RETRIES";
3334
static final String BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME = "FLAGD_RETRY_BACKOFF_MS";
3435
static final String DEADLINE_MS_ENV_VAR_NAME = "FLAGD_DEADLINE_MS";
36+
static final String FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME = "FLAGD_RETRY_BACKOFF_MAX_MS";
3537
static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS";
3638
static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR";
3739
static final String SOURCE_PROVIDER_ID_ENV_VAR_NAME = "FLAGD_SOURCE_PROVIDER_ID";

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,18 @@ public class FlagdOptions {
9797
@Builder.Default
9898
private int deadline = fallBackToEnvOrDefault(Config.DEADLINE_MS_ENV_VAR_NAME, Config.DEFAULT_DEADLINE);
9999

100+
/**
101+
* Max stream retry backoff in milliseconds.
102+
*/
103+
@Builder.Default
104+
private int retryBackoffMaxMs =
105+
fallBackToEnvOrDefault(Config.FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME, Config.DEFAULT_MAX_RETRY_BACKOFF);
106+
100107
/**
101108
* Streaming connection deadline in milliseconds.
102109
* Set to 0 to disable the deadline.
103-
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from killing idle connections.
110+
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from
111+
* killing idle connections.
104112
*/
105113
@Builder.Default
106114
private int streamDeadlineMs =

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java

Lines changed: 15 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,11 @@
2626
/** gRPC channel builder helper. */
2727
public class ChannelBuilder {
2828

29-
@SuppressWarnings({"unchecked", "rawtypes"})
30-
private static final Map<String, ?> DEFAULT_RETRY_POLICY = new HashMap() {
31-
{
32-
// 1s + 2s + 4s
33-
put("maxAttempts", 3.0); // types used here are important, need to be doubles
34-
put("initialBackoff", "1s");
35-
put("maxBackoff", "5s");
36-
put("backoffMultiplier", 2.0);
37-
// status codes to retry on:
38-
put(
39-
"retryableStatusCodes",
40-
Arrays.asList(
41-
/*
42-
* Only states UNAVAILABLE and UNKNOWN should be retried. All
43-
* other failure states will probably not be resolved with a simple retry.
44-
*/
45-
Code.UNAVAILABLE.toString(), Code.UNKNOWN.toString()));
46-
}
47-
};
29+
/**
30+
* Maximum retry backoff in seconds.
31+
* Also used by SyncStreamQueueSource to control outer loop retry delay.
32+
*/
33+
public static final int MAX_RETRY_BACKOFF_SECONDS = 5;
4834

4935
/**
5036
* Controls retry (not-reconnection) policy for failed RPCs.
@@ -67,46 +53,22 @@ public class ChannelBuilder {
6753
put("service", "flagd.evaluation.v1.Service");
6854
}
6955
}));
70-
put("retryPolicy", DEFAULT_RETRY_POLICY);
71-
}
72-
73-
{
74-
put("name", Arrays.asList(new HashMap() {
75-
{
76-
put("service", "flagd.sync.v1.FlagSyncService");
77-
put("method", "SyncFlags");
78-
}
79-
}));
80-
put("retryPolicy", new HashMap(DEFAULT_RETRY_POLICY) {
56+
put("retryPolicy", new HashMap() {
8157
{
82-
// 1s + 2s + 4s + 5s + 5s + 5s + 5s + 5s + 5s + 5s
83-
put("maxAttempts", 12.0);
84-
// for streaming Retry on more status codes
58+
// 1 + 2 + 4
59+
put("maxAttempts", 3.0); // types used here are important, need to be doubles
60+
put("initialBackoff", "1s");
61+
put("maxBackoff", String.format("%ds", MAX_RETRY_BACKOFF_SECONDS));
62+
put("backoffMultiplier", 2.0);
63+
// status codes to retry on:
8564
put(
8665
"retryableStatusCodes",
8766
Arrays.asList(
8867
/*
89-
* All codes are retryable except OK and DEADLINE_EXCEEDED since
90-
* any others not listed here cause a very tight loop of retries.
91-
* DEADLINE_EXCEEDED is typically a result of a client specified
92-
* deadline,and definitionally should not result in a tight loop
93-
* (it's a timeout).
68+
* As per gRPC spec, the following status codes are safe to retry:
69+
* UNAVAILABLE, UNKNOWN,
9470
*/
95-
Code.CANCELLED.toString(),
96-
Code.UNKNOWN.toString(),
97-
Code.INVALID_ARGUMENT.toString(),
98-
Code.NOT_FOUND.toString(),
99-
Code.ALREADY_EXISTS.toString(),
100-
Code.PERMISSION_DENIED.toString(),
101-
Code.RESOURCE_EXHAUSTED.toString(),
102-
Code.FAILED_PRECONDITION.toString(),
103-
Code.ABORTED.toString(),
104-
Code.OUT_OF_RANGE.toString(),
105-
Code.UNIMPLEMENTED.toString(),
106-
Code.INTERNAL.toString(),
107-
Code.UNAVAILABLE.toString(),
108-
Code.DATA_LOSS.toString(),
109-
Code.UNAUTHENTICATED.toString()));
71+
Code.UNKNOWN.toString(), Code.UNAVAILABLE.toString()));
11072
}
11173
});
11274
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,17 @@
3535
value = {"PREDICTABLE_RANDOM", "EI_EXPOSE_REP"},
3636
justification = "Random is used to generate a variation & flag configurations require exposing")
3737
public class SyncStreamQueueSource implements QueueSource {
38+
/**
39+
* Delay between retries of the outer loop, in milliseconds. This is derived from the channel retry (5s)
40+
*/
41+
public static final int RETRY_LOOP_DELAY_MS = ChannelBuilder.MAX_RETRY_BACKOFF_SECONDS * 1000;
42+
3843
private static final int QUEUE_SIZE = 5;
3944

4045
private final AtomicBoolean shutdown = new AtomicBoolean(false);
4146
private final int streamDeadline;
4247
private final int deadline;
48+
private final int maxBackoffMs;
4349
private final String selector;
4450
private final String providerId;
4551
private final boolean syncMetadataDisabled;
@@ -56,6 +62,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
5662
deadline = options.getDeadline();
5763
selector = options.getSelector();
5864
providerId = options.getProviderId();
65+
maxBackoffMs = options.getRetryBackoffMaxMs();
5966
syncMetadataDisabled = options.isSyncMetadataDisabled();
6067
channelConnector = new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
6168
flagSyncStub =
@@ -75,6 +82,7 @@ protected SyncStreamQueueSource(
7582
selector = options.getSelector();
7683
providerId = options.getProviderId();
7784
channelConnector = connectorMock;
85+
maxBackoffMs = options.getRetryBackoffMaxMs();
7886
flagSyncStub = stubMock;
7987
syncMetadataDisabled = options.isSyncMetadataDisabled();
8088
metadataStub = blockingStubMock;
@@ -110,26 +118,32 @@ private void observeSyncStream() {
110118
log.info("Initializing sync stream observer");
111119

112120
// outer loop for re-issuing the stream request
113-
// "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions
121+
// "waitForReady" on the channel, plus our retry policy slow this loop down in
122+
// error conditions
114123
while (!shutdown.get()) {
115-
log.debug("Initializing sync stream request");
116-
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue);
117-
118-
try {
119-
observer.metadata = getMetadata();
120-
} catch (Exception metaEx) {
121-
// retry if getMetadata fails
122-
String message = metaEx.getMessage();
123-
log.debug("Metadata request error: {}, will restart", message, metaEx);
124-
enqueueError(String.format("Error in getMetadata request: %s", message));
125-
continue;
126-
}
127-
128124
try {
129-
syncFlags(observer);
130-
} catch (Exception ex) {
131-
log.error("Unexpected sync stream exception, will restart.", ex);
132-
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
125+
log.debug("Initializing sync stream request");
126+
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue);
127+
try {
128+
observer.metadata = getMetadata();
129+
} catch (Exception metaEx) {
130+
// retry if getMetadata fails
131+
String message = metaEx.getMessage();
132+
log.debug("Metadata request error: {}, will restart", message, metaEx);
133+
enqueueError(String.format("Error in getMetadata request: %s", message));
134+
Thread.sleep(this.maxBackoffMs);
135+
continue;
136+
}
137+
138+
try {
139+
syncFlags(observer);
140+
} catch (Exception ex) {
141+
log.error("Unexpected sync stream exception, will restart.", ex);
142+
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
143+
Thread.sleep(this.maxBackoffMs);
144+
}
145+
} catch (InterruptedException ie) {
146+
log.debug("Stream loop interrupted, most likely shutdown was invoked", ie);
133147
}
134148
}
135149

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
2424
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest;
2525
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
26+
import io.grpc.StatusRuntimeException;
2627
import io.grpc.stub.StreamObserver;
2728
import java.util.concurrent.BlockingQueue;
2829
import java.util.concurrent.CountDownLatch;
@@ -35,11 +36,12 @@ class SyncStreamQueueSourceTest {
3536
private ChannelConnector mockConnector;
3637
private FlagSyncServiceBlockingStub blockingStub;
3738
private FlagSyncServiceStub stub;
39+
private FlagSyncServiceStub errorStub;
3840
private StreamObserver<SyncFlagsResponse> observer;
3941
private CountDownLatch latch; // used to wait for observer to be initialized
4042

4143
@BeforeEach
42-
public void init() throws Exception {
44+
public void setup() throws Exception {
4345
blockingStub = mock(FlagSyncServiceBlockingStub.class);
4446
when(blockingStub.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStub);
4547
when(blockingStub.getMetadata(any())).thenReturn(GetMetadataResponse.getDefaultInstance());
@@ -57,22 +59,54 @@ public void init() throws Exception {
5759
})
5860
.when(stub)
5961
.syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize
60-
// method
62+
63+
errorStub = mock(FlagSyncServiceStub.class);
64+
when(errorStub.withDeadlineAfter(anyLong(), any())).thenReturn(errorStub);
65+
doAnswer((Answer<Void>) invocation -> {
66+
Object[] args = invocation.getArguments();
67+
observer = (StreamObserver<SyncFlagsResponse>) args[1];
68+
latch.countDown();
69+
throw new StatusRuntimeException(io.grpc.Status.NOT_FOUND);
70+
})
71+
.when(errorStub)
72+
.syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize
73+
}
74+
75+
@Test
76+
void initError_DoesNotBusyWait() throws Exception {
77+
// make sure we do not spin in a busy loop on errors
78+
79+
int maxBackoffMs = 1000;
80+
SyncStreamQueueSource queueSource = new SyncStreamQueueSource(
81+
FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(), mockConnector, errorStub, blockingStub);
82+
latch = new CountDownLatch(1);
83+
queueSource.init();
84+
latch.await();
85+
86+
BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
87+
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
88+
assertNotNull(payload);
89+
assertEquals(QueuePayloadType.ERROR, payload.getType());
90+
Thread.sleep(maxBackoffMs + (maxBackoffMs / 2)); // wait 1.5x our delay for reties
91+
92+
// should have retried the stream (2 calls); initial + 1 retry
93+
// it's very important that the retry count is low, to confirm no busy-loop
94+
verify(errorStub, times(2)).syncFlags(any(), any());
6195
}
6296

6397
@Test
6498
void onNextEnqueuesDataPayload() throws Exception {
65-
SyncStreamQueueSource connector =
99+
SyncStreamQueueSource queueSource =
66100
new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub);
67101
latch = new CountDownLatch(1);
68-
connector.init();
102+
queueSource.init();
69103
latch.await();
70104

71105
// fire onNext (data) event
72106
observer.onNext(SyncFlagsResponse.newBuilder().build());
73107

74108
// should enqueue data payload
75-
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
109+
BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
76110
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
77111
assertNotNull(payload);
78112
assertNotNull(payload.getSyncContext());
@@ -84,17 +118,17 @@ void onNextEnqueuesDataPayload() throws Exception {
84118
@Test
85119
void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception {
86120
// disable GetMetadata call
87-
SyncStreamQueueSource connector = new SyncStreamQueueSource(
121+
SyncStreamQueueSource queueSource = new SyncStreamQueueSource(
88122
FlagdOptions.builder().syncMetadataDisabled(true).build(), mockConnector, stub, blockingStub);
89123
latch = new CountDownLatch(1);
90-
connector.init();
124+
queueSource.init();
91125
latch.await();
92126

93127
// fire onNext (data) event
94128
observer.onNext(SyncFlagsResponse.newBuilder().build());
95129

96130
// should enqueue data payload
97-
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
131+
BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
98132
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
99133
assertNotNull(payload);
100134
assertNull(payload.getSyncContext());
@@ -108,10 +142,10 @@ void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception {
108142
@Test
109143
void onNextEnqueuesDataPayloadWithSyncContext() throws Exception {
110144
// disable GetMetadata call
111-
SyncStreamQueueSource connector =
145+
SyncStreamQueueSource queueSource =
112146
new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub);
113147
latch = new CountDownLatch(1);
114-
connector.init();
148+
queueSource.init();
115149
latch.await();
116150

117151
// fire onNext (data) event
@@ -120,7 +154,7 @@ void onNextEnqueuesDataPayloadWithSyncContext() throws Exception {
120154
SyncFlagsResponse.newBuilder().setSyncContext(syncContext).build());
121155

122156
// should enqueue data payload
123-
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
157+
BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
124158
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
125159
assertNotNull(payload);
126160
assertEquals(syncContext, payload.getSyncContext());
@@ -131,18 +165,18 @@ void onNextEnqueuesDataPayloadWithSyncContext() throws Exception {
131165

132166
@Test
133167
void onErrorEnqueuesDataPayload() throws Exception {
134-
SyncStreamQueueSource connector =
168+
SyncStreamQueueSource queueSource =
135169
new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub);
136170
latch = new CountDownLatch(1);
137-
connector.init();
171+
queueSource.init();
138172
latch.await();
139173

140174
// fire onError event and reset latch
141175
latch = new CountDownLatch(1);
142176
observer.onError(new Exception("fake exception"));
143177

144178
// should enqueue error payload
145-
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
179+
BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
146180
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
147181
assertNotNull(payload);
148182
assertEquals(QueuePayloadType.ERROR, payload.getType());
@@ -153,18 +187,18 @@ void onErrorEnqueuesDataPayload() throws Exception {
153187

154188
@Test
155189
void onCompletedEnqueuesDataPayload() throws Exception {
156-
SyncStreamQueueSource connector =
190+
SyncStreamQueueSource queueSource =
157191
new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub);
158192
latch = new CountDownLatch(1);
159-
connector.init();
193+
queueSource.init();
160194
latch.await();
161195

162196
// fire onCompleted event (graceful stream end) and reset latch
163197
latch = new CountDownLatch(1);
164198
observer.onCompleted();
165199

166200
// should enqueue error payload
167-
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
201+
BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
168202
assertTrue(streamQueue.isEmpty());
169203
// should have restarted the stream (2 calls)
170204
latch.await();

0 commit comments

Comments
 (0)