Skip to content

Commit 6466de8

Browse files
committed
fix(flagd): Remove one level of queueing in SyncStreamQueueSource and added Stream cancellation
Signed-off-by: Guido Breitenhuber <[email protected]>
1 parent 60544cf commit 6466de8

File tree

2 files changed

+89
-67
lines changed

2 files changed

+89
-67
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 85 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
55
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
66
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
7-
import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver;
8-
import dev.openfeature.contrib.providers.flagd.resolver.common.StreamResponseModel;
97
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
108
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
119
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
@@ -17,13 +15,13 @@
1715
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest;
1816
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
1917
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20-
import io.grpc.Context;
21-
import io.grpc.Context.CancellableContext;
2218
import java.util.concurrent.BlockingQueue;
2319
import java.util.concurrent.LinkedBlockingQueue;
2420
import java.util.concurrent.TimeUnit;
2521
import java.util.concurrent.atomic.AtomicBoolean;
2622
import java.util.function.Consumer;
23+
import io.grpc.Context;
24+
import io.grpc.stub.StreamObserver;
2725
import lombok.extern.slf4j.Slf4j;
2826

2927
/**
@@ -43,8 +41,6 @@ public class SyncStreamQueueSource implements QueueSource {
4341
private final String providerId;
4442
private final boolean syncMetadataDisabled;
4543
private final ChannelConnector<FlagSyncServiceStub, FlagSyncServiceBlockingStub> channelConnector;
46-
private final LinkedBlockingQueue<StreamResponseModel<SyncFlagsResponse>> incomingQueue =
47-
new LinkedBlockingQueue<>(QUEUE_SIZE);
4844
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
4945
private final FlagSyncServiceStub stub;
5046
private final FlagSyncServiceBlockingStub blockingStub;
@@ -115,82 +111,57 @@ public void shutdown() throws InterruptedException {
115111

116112
/** Contains blocking calls, to be used concurrently. */
117113
private void observeSyncStream() throws InterruptedException {
118-
119114
log.info("Initializing sync stream observer");
120115

121116
// outer loop for re-issuing the stream request
122117
// "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions
123118
while (!shutdown.get()) {
124-
125119
log.debug("Initializing sync stream request");
126-
final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder();
127-
GetMetadataResponse metadataResponse = null;
128-
129-
// create a context which exists to track and cancel the stream
130-
try (CancellableContext context = Context.current().withCancellation()) {
131-
132-
restart(); // start the stream with the context
133120

134-
// TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
135-
if (!syncMetadataDisabled) {
136-
try {
137-
FlagSyncServiceBlockingStub localStub = blockingStub;
138-
139-
if (deadline > 0) {
140-
localStub = localStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
141-
}
142-
143-
metadataResponse = localStub.getMetadata(metadataRequest.build());
144-
} catch (Exception metaEx) {
145-
// cancel the stream if the getMetadata fails
146-
// we can keep this log quiet since the stream is cancelled/restarted with this exception
147-
log.debug("Metadata exception: {}, cancelling stream", metaEx.getMessage(), metaEx);
148-
context.cancel(metaEx);
121+
try (Context.CancellableContext context = Context.current().withCancellation()) {
122+
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, context);
123+
Context previous = context.attach();
124+
try {
125+
if (!syncMetadataDisabled) {
126+
observer.metadata = getMetadata(context);
149127
}
128+
129+
syncFlags(observer);
130+
} finally {
131+
context.detach(previous);
150132
}
133+
}
134+
}
151135

152-
// inner loop for handling messages
153-
while (!shutdown.get() && !Context.current().isCancelled()) {
154-
final StreamResponseModel<SyncFlagsResponse> taken = incomingQueue.take();
155-
if (taken.isComplete()) {
156-
log.debug("Sync stream completed, will restart");
157-
// The stream is complete, we still try to reconnect
158-
break;
159-
}
136+
log.info("Shutdown invoked, exiting event stream listener");
137+
}
160138

161-
Throwable streamException = taken.getError();
162-
if (streamException != null) {
163-
log.debug("Exception in stream RPC, streamException {}, will restart", streamException);
164-
if (!outgoingQueue.offer(new QueuePayload(
165-
QueuePayloadType.ERROR,
166-
String.format("Error from stream: %s", streamException.getMessage())))) {
167-
log.error("Failed to convey ERROR status, queue is full");
168-
}
169-
break;
170-
}
139+
// TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
140+
private static final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder();
141+
private Struct getMetadata(Context.CancellableContext context) {
142+
try {
143+
FlagSyncServiceBlockingStub localStub = blockingStub;
171144

172-
final SyncFlagsResponse flagsResponse = taken.getResponse();
173-
final String data = flagsResponse.getFlagConfiguration();
174-
log.debug("Got stream response: {}", data);
145+
if (deadline > 0) {
146+
localStub = localStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
147+
}
175148

176-
Struct syncContext = null;
177-
if (flagsResponse.hasSyncContext()) {
178-
syncContext = flagsResponse.getSyncContext();
179-
} else if (metadataResponse != null) {
180-
syncContext = metadataResponse.getMetadata();
181-
}
149+
GetMetadataResponse metadataResponse = localStub.getMetadata(metadataRequest.build());
182150

183-
if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, syncContext))) {
184-
log.error("Stream writing failed");
185-
}
186-
}
151+
if (metadataResponse != null) {
152+
return metadataResponse.getMetadata();
187153
}
154+
} catch (Exception metaEx) {
155+
// retry getMetadata fails
156+
// we can keep this log quiet since the stream is cancelled/restarted with this exception
157+
log.debug("Metadata exception: {}, retrying", metaEx.getMessage(), metaEx);
158+
context.cancel(metaEx);
188159
}
189160

190-
log.info("Shutdown invoked, exiting event stream listener");
161+
return null;
191162
}
192163

193-
private void restart() {
164+
private void syncFlags(SyncStreamObserver streamObserver) {
194165
FlagSyncServiceStub localStub = stub; // don't mutate the stub
195166
if (streamDeadline > 0) {
196167
localStub = localStub.withDeadlineAfter(streamDeadline, TimeUnit.MILLISECONDS);
@@ -205,6 +176,56 @@ private void restart() {
205176
syncRequest.setProviderId(this.providerId);
206177
}
207178

208-
localStub.syncFlags(syncRequest.build(), new QueueingStreamObserver<SyncFlagsResponse>(incomingQueue));
179+
localStub.syncFlags(syncRequest.build(), streamObserver);
180+
181+
while (!shutdown.get() && !streamObserver.context.isCancelled()) {
182+
try {
183+
Thread.sleep(500);
184+
} catch (InterruptedException e) {
185+
log.warn("Sync stream interrupted, will restart", e);
186+
streamObserver.context.cancel(e);
187+
}
188+
}
189+
}
190+
191+
private static class SyncStreamObserver implements StreamObserver<SyncFlagsResponse> {
192+
private final BlockingQueue<QueuePayload> outgoingQueue;
193+
private final Context.CancellableContext context;
194+
private Struct metadata;
195+
196+
public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue, Context.CancellableContext context) {
197+
this.outgoingQueue = outgoingQueue;
198+
this.context = context;
199+
}
200+
201+
@Override
202+
public void onNext(SyncFlagsResponse syncFlagsResponse) {
203+
final String data = syncFlagsResponse.getFlagConfiguration();
204+
log.debug("Got stream response: {}", data);
205+
206+
Struct syncContext = syncFlagsResponse.hasSyncContext() ? syncFlagsResponse.getSyncContext() : metadata;
207+
208+
if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, syncContext))) {
209+
log.error("Stream writing failed");
210+
}
211+
}
212+
213+
@Override
214+
public void onError(Throwable throwable) {
215+
try {
216+
log.error("Stream error: {}, cancelling stream", throwable.getMessage(), throwable);
217+
if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.ERROR, null, null))) {
218+
log.error("Stream writing failed");
219+
}
220+
} finally {
221+
context.cancel(throwable);
222+
}
223+
}
224+
225+
@Override
226+
public void onCompleted() {
227+
log.debug("Sync stream completed, will restart");
228+
context.cancel(null);
229+
}
209230
}
210231
}

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.BlockingQueue;
2828
import java.util.concurrent.CountDownLatch;
2929
import java.util.concurrent.TimeUnit;
30+
import io.grpc.stub.StreamObserver;
3031
import org.junit.jupiter.api.BeforeEach;
3132
import org.junit.jupiter.api.Test;
3233
import org.mockito.stubbing.Answer;
@@ -35,7 +36,7 @@ class SyncStreamQueueSourceTest {
3536
private ChannelConnector<FlagSyncServiceStub, FlagSyncServiceBlockingStub> mockConnector;
3637
private FlagSyncServiceBlockingStub blockingStub;
3738
private FlagSyncServiceStub stub;
38-
private QueueingStreamObserver<SyncFlagsResponse> observer;
39+
private StreamObserver<SyncFlagsResponse> observer;
3940
private CountDownLatch latch; // used to wait for observer to be initialized
4041

4142
@BeforeEach
@@ -51,12 +52,12 @@ public void init() throws Exception {
5152
when(stub.withDeadlineAfter(anyLong(), any())).thenReturn(stub);
5253
doAnswer((Answer<Void>) invocation -> {
5354
Object[] args = invocation.getArguments();
54-
observer = (QueueingStreamObserver<SyncFlagsResponse>) args[1];
55+
observer = (StreamObserver<SyncFlagsResponse>) args[1];
5556
latch.countDown();
5657
return null;
5758
})
5859
.when(stub)
59-
.syncFlags(any(SyncFlagsRequest.class), any(QueueingStreamObserver.class)); // Mock the initialize
60+
.syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize
6061
// method
6162
}
6263

0 commit comments

Comments
 (0)