Skip to content

Commit f91c782

Browse files
committed
fixup: add sleep delay to prevent busy loop
Signed-off-by: Todd Baert <[email protected]>
1 parent 1eccc02 commit f91c782

File tree

3 files changed

+95
-87
lines changed

3 files changed

+95
-87
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java

Lines changed: 15 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,11 @@
2626
/** gRPC channel builder helper. */
2727
public class ChannelBuilder {
2828

29-
@SuppressWarnings({"unchecked", "rawtypes"})
30-
private static final Map<String, ?> DEFAULT_RETRY_POLICY = new HashMap() {
31-
{
32-
// 1s + 2s + 4s
33-
put("maxAttempts", 3.0); // types used here are important, need to be doubles
34-
put("initialBackoff", "1s");
35-
put("maxBackoff", "5s");
36-
put("backoffMultiplier", 2.0);
37-
// status codes to retry on:
38-
put(
39-
"retryableStatusCodes",
40-
Arrays.asList(
41-
/*
42-
* Only states UNAVAILABLE and UNKNOWN should be retried. All
43-
* other failure states will probably not be resolved with a simple retry.
44-
*/
45-
Code.UNAVAILABLE.toString(), Code.UNKNOWN.toString()));
46-
}
47-
};
29+
/**
30+
* Maximum retry backoff in seconds.
31+
* Also used by SyncStreamQueueSource to control outer loop retry delay.
32+
*/
33+
public static final int MAX_RETRY_BACKOFF_SECONDS = 5;
4834

4935
/**
5036
* Controls retry (not-reconnection) policy for failed RPCs.
@@ -67,46 +53,23 @@ public class ChannelBuilder {
6753
put("service", "flagd.evaluation.v1.Service");
6854
}
6955
}));
70-
put("retryPolicy", DEFAULT_RETRY_POLICY);
71-
}
72-
73-
{
74-
put("name", Arrays.asList(new HashMap() {
75-
{
76-
put("service", "flagd.sync.v1.FlagSyncService");
77-
put("method", "SyncFlags");
78-
}
79-
}));
80-
put("retryPolicy", new HashMap(DEFAULT_RETRY_POLICY) {
56+
put("retryPolicy", new HashMap() {
8157
{
82-
// 1s + 2s + 4s + 5s + 5s + 5s + 5s + 5s + 5s + 5s
83-
put("maxAttempts", 12.0);
84-
// for streaming Retry on more status codes
58+
// 1 + 2 + 4
59+
put("maxAttempts", 3.0); // types used here are important, need to be doubles
60+
put("initialBackoff", "1s");
61+
put("maxBackoff", String.format("%ds", MAX_RETRY_BACKOFF_SECONDS));
62+
put("backoffMultiplier", 2.0);
63+
// status codes to retry on:
8564
put(
8665
"retryableStatusCodes",
8766
Arrays.asList(
8867
/*
89-
* All codes are retryable except OK and DEADLINE_EXCEEDED since
90-
* any others not listed here cause a very tight loop of retries.
91-
* DEADLINE_EXCEEDED is typically a result of a client specified
92-
* deadline,and definitionally should not result in a tight loop
93-
* (it's a timeout).
68+
* As per gRPC spec, the following status codes are safe to retry:
69+
* UNAVAILABLE, UNKNOWN,
9470
*/
95-
Code.CANCELLED.toString(),
9671
Code.UNKNOWN.toString(),
97-
Code.INVALID_ARGUMENT.toString(),
98-
Code.NOT_FOUND.toString(),
99-
Code.ALREADY_EXISTS.toString(),
100-
Code.PERMISSION_DENIED.toString(),
101-
Code.RESOURCE_EXHAUSTED.toString(),
102-
Code.FAILED_PRECONDITION.toString(),
103-
Code.ABORTED.toString(),
104-
Code.OUT_OF_RANGE.toString(),
105-
Code.UNIMPLEMENTED.toString(),
106-
Code.INTERNAL.toString(),
107-
Code.UNAVAILABLE.toString(),
108-
Code.DATA_LOSS.toString(),
109-
Code.UNAUTHENTICATED.toString()));
72+
Code.UNAVAILABLE.toString()));
11073
}
11174
});
11275
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
value = {"PREDICTABLE_RANDOM", "EI_EXPOSE_REP"},
3636
justification = "Random is used to generate a variation & flag configurations require exposing")
3737
public class SyncStreamQueueSource implements QueueSource {
38+
/**
39+
* Delay between retries of the outer loop, in milliseconds. This is derived from the channel retry (5s)
40+
*/
41+
public static final int RETRY_LOOP_DELAY_MS = ChannelBuilder.MAX_RETRY_BACKOFF_SECONDS * 1000;
3842
private static final int QUEUE_SIZE = 5;
3943

4044
private final AtomicBoolean shutdown = new AtomicBoolean(false);
@@ -110,26 +114,32 @@ private void observeSyncStream() {
110114
log.info("Initializing sync stream observer");
111115

112116
// outer loop for re-issuing the stream request
113-
// "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions
117+
// "waitForReady" on the channel, plus our retry policy slow this loop down in
118+
// error conditions
114119
while (!shutdown.get()) {
115-
log.debug("Initializing sync stream request");
116-
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue);
117-
118-
try {
119-
observer.metadata = getMetadata();
120-
} catch (Exception metaEx) {
121-
// retry if getMetadata fails
122-
String message = metaEx.getMessage();
123-
log.debug("Metadata request error: {}, will restart", message, metaEx);
124-
enqueueError(String.format("Error in getMetadata request: %s", message));
125-
continue;
126-
}
127-
128120
try {
129-
syncFlags(observer);
130-
} catch (Exception ex) {
131-
log.error("Unexpected sync stream exception, will restart.", ex);
132-
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
121+
log.debug("Initializing sync stream request");
122+
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue);
123+
try {
124+
observer.metadata = getMetadata();
125+
} catch (Exception metaEx) {
126+
// retry if getMetadata fails
127+
String message = metaEx.getMessage();
128+
log.debug("Metadata request error: {}, will restart", message, metaEx);
129+
enqueueError(String.format("Error in getMetadata request: %s", message));
130+
Thread.sleep(RETRY_LOOP_DELAY_MS);
131+
continue;
132+
}
133+
134+
try {
135+
syncFlags(observer);
136+
} catch (Exception ex) {
137+
log.error("Unexpected sync stream exception, will restart.", ex);
138+
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
139+
Thread.sleep(RETRY_LOOP_DELAY_MS);
140+
}
141+
} catch (InterruptedException ie) {
142+
log.debug("Stream loop interrupted, most likely shutdown was invoked", ie);
133143
}
134144
}
135145

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
2424
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest;
2525
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
26+
import io.grpc.StatusRuntimeException;
2627
import io.grpc.stub.StreamObserver;
2728
import java.util.concurrent.BlockingQueue;
2829
import java.util.concurrent.CountDownLatch;
@@ -35,11 +36,12 @@ class SyncStreamQueueSourceTest {
3536
private ChannelConnector mockConnector;
3637
private FlagSyncServiceBlockingStub blockingStub;
3738
private FlagSyncServiceStub stub;
39+
private FlagSyncServiceStub errorStub;
3840
private StreamObserver<SyncFlagsResponse> observer;
3941
private CountDownLatch latch; // used to wait for observer to be initialized
4042

4143
@BeforeEach
42-
public void init() throws Exception {
44+
public void setup() throws Exception {
4345
blockingStub = mock(FlagSyncServiceBlockingStub.class);
4446
when(blockingStub.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStub);
4547
when(blockingStub.getMetadata(any())).thenReturn(GetMetadataResponse.getDefaultInstance());
@@ -57,22 +59,54 @@ public void init() throws Exception {
5759
})
5860
.when(stub)
5961
.syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize
60-
// method
62+
63+
errorStub = mock(FlagSyncServiceStub.class);
64+
when(errorStub.withDeadlineAfter(anyLong(), any())).thenReturn(errorStub);
65+
doAnswer((Answer<Void>) invocation -> {
66+
Object[] args = invocation.getArguments();
67+
observer = (StreamObserver<SyncFlagsResponse>) args[1];
68+
latch.countDown();
69+
throw new StatusRuntimeException(io.grpc.Status.NOT_FOUND);
70+
})
71+
.when(errorStub)
72+
.syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize
73+
}
74+
75+
@Test
76+
void initError_DoesNotBusyWait() throws Exception {
77+
// make sure we do not spin in a busy loop on errors
78+
79+
SyncStreamQueueSource queueSource =
80+
new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, errorStub, blockingStub);
81+
latch = new CountDownLatch(1);
82+
queueSource.init();
83+
latch.await();
84+
85+
BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
86+
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
87+
assertNotNull(payload);
88+
assertEquals(QueuePayloadType.ERROR, payload.getType());
89+
Thread.sleep(SyncStreamQueueSource.RETRY_LOOP_DELAY_MS + 1000); // wait for retries
90+
91+
// should have retried the stream (2 calls); initial + 1 retry
92+
// it's very important that the retry count is low, to confirm no busy-loop
93+
verify(errorStub, times(2)).syncFlags(any(), any());
6194
}
6295

96+
6397
@Test
6498
void onNextEnqueuesDataPayload() throws Exception {
65-
SyncStreamQueueSource connector =
99+
SyncStreamQueueSource queueSource =
66100
new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub);
67101
latch = new CountDownLatch(1);
68-
connector.init();
102+
queueSource.init();
69103
latch.await();
70104

71105
// fire onNext (data) event
72106
observer.onNext(SyncFlagsResponse.newBuilder().build());
73107

74108
// should enqueue data payload
75-
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
109+
BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
76110
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
77111
assertNotNull(payload);
78112
assertNotNull(payload.getSyncContext());
@@ -81,20 +115,21 @@ void onNextEnqueuesDataPayload() throws Exception {
81115
verify(stub, times(1)).syncFlags(any(), any());
82116
}
83117

118+
84119
@Test
85120
void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception {
86121
// disable GetMetadata call
87-
SyncStreamQueueSource connector = new SyncStreamQueueSource(
122+
SyncStreamQueueSource queueSource = new SyncStreamQueueSource(
88123
FlagdOptions.builder().syncMetadataDisabled(true).build(), mockConnector, stub, blockingStub);
89124
latch = new CountDownLatch(1);
90-
connector.init();
125+
queueSource.init();
91126
latch.await();
92127

93128
// fire onNext (data) event
94129
observer.onNext(SyncFlagsResponse.newBuilder().build());
95130

96131
// should enqueue data payload
97-
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
132+
BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
98133
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
99134
assertNotNull(payload);
100135
assertNull(payload.getSyncContext());
@@ -108,10 +143,10 @@ void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception {
108143
@Test
109144
void onNextEnqueuesDataPayloadWithSyncContext() throws Exception {
110145
// disable GetMetadata call
111-
SyncStreamQueueSource connector =
146+
SyncStreamQueueSource queueSource =
112147
new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub);
113148
latch = new CountDownLatch(1);
114-
connector.init();
149+
queueSource.init();
115150
latch.await();
116151

117152
// fire onNext (data) event
@@ -120,7 +155,7 @@ void onNextEnqueuesDataPayloadWithSyncContext() throws Exception {
120155
SyncFlagsResponse.newBuilder().setSyncContext(syncContext).build());
121156

122157
// should enqueue data payload
123-
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
158+
BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
124159
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
125160
assertNotNull(payload);
126161
assertEquals(syncContext, payload.getSyncContext());
@@ -131,18 +166,18 @@ void onNextEnqueuesDataPayloadWithSyncContext() throws Exception {
131166

132167
@Test
133168
void onErrorEnqueuesDataPayload() throws Exception {
134-
SyncStreamQueueSource connector =
169+
SyncStreamQueueSource queueSource =
135170
new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub);
136171
latch = new CountDownLatch(1);
137-
connector.init();
172+
queueSource.init();
138173
latch.await();
139174

140175
// fire onError event and reset latch
141176
latch = new CountDownLatch(1);
142177
observer.onError(new Exception("fake exception"));
143178

144179
// should enqueue error payload
145-
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
180+
BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
146181
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
147182
assertNotNull(payload);
148183
assertEquals(QueuePayloadType.ERROR, payload.getType());
@@ -153,18 +188,18 @@ void onErrorEnqueuesDataPayload() throws Exception {
153188

154189
@Test
155190
void onCompletedEnqueuesDataPayload() throws Exception {
156-
SyncStreamQueueSource connector =
191+
SyncStreamQueueSource queueSource =
157192
new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub);
158193
latch = new CountDownLatch(1);
159-
connector.init();
194+
queueSource.init();
160195
latch.await();
161196

162197
// fire onCompleted event (graceful stream end) and reset latch
163198
latch = new CountDownLatch(1);
164199
observer.onCompleted();
165200

166201
// should enqueue error payload
167-
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
202+
BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
168203
assertTrue(streamQueue.isEmpty());
169204
// should have restarted the stream (2 calls)
170205
latch.await();

0 commit comments

Comments
 (0)