Skip to content

Commit 0c148a6

Browse files
committed
fixup: use deadline, pr feedback
Signed-off-by: Todd Baert <[email protected]>
1 parent 8594544 commit 0c148a6

File tree

4 files changed

+47
-37
lines changed

4 files changed

+47
-37
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,14 @@ class EventStreamObserver implements StreamObserver<EventStreamResponse> {
3131
/**
3232
* Create a gRPC stream that get notified about flag changes.
3333
*
34-
* @param sync synchronization object from caller
35-
* @param cache cache to update
36-
* @param onResponse lambda to call to handle the response
34+
* @param sync synchronization object from caller
35+
* @param cache cache to update
36+
* @param onConnectionEvent lambda to call to handle the response
3737
*/
38-
EventStreamObserver(Object sync, Cache cache, BiConsumer<Boolean, List<String>> onResponse) {
38+
EventStreamObserver(Object sync, Cache cache, BiConsumer<Boolean, List<String>> onConnectionEvent) {
3939
this.sync = sync;
4040
this.cache = cache;
41-
this.onConnectionEvent = onResponse;
41+
this.onConnectionEvent = onConnectionEvent;
4242
}
4343

4444
@Override

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public ServiceGrpc.ServiceBlockingStub getResolver() {
125125
private void observeEventStream() {
126126
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
127127
final StreamObserver<EventStreamResponse> responseObserver = new EventStreamObserver(sync, this.cache,
128-
this::grpconConnectionEvent);
128+
this::onConnectionEvent);
129129
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);
130130

131131
try {
@@ -156,10 +156,10 @@ private void observeEventStream() {
156156
}
157157

158158
log.error("failed to connect to event stream, exhausted retries");
159-
this.grpconConnectionEvent(false, Collections.emptyList());
159+
this.onConnectionEvent(false, Collections.emptyList());
160160
}
161161

162-
private void grpcOnConnectionEvent(final boolean connected, final List<String> changedFlags) {
162+
private void onConnectionEvent(final boolean connected, final List<String> changedFlags) {
163163
// reset reconnection states
164164
if (connected) {
165165
this.eventStreamAttempt = 1;

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,12 @@
3535
"EI_EXPOSE_REP" }, justification = "Random is used to generate a variation & flag configurations require exposing")
3636
public class GrpcStreamConnector implements Connector {
3737
private static final Random RANDOM = new Random();
38-
3938
private static final int INIT_BACK_OFF = 2 * 1000;
4039
private static final int MAX_BACK_OFF = 120 * 1000;
41-
4240
private static final int QUEUE_SIZE = 5;
4341

4442
private final AtomicBoolean shutdown = new AtomicBoolean(false);
4543
private final BlockingQueue<QueuePayload> blockingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
46-
4744
private final ManagedChannel channel;
4845
private final FlagSyncServiceStub serviceStub;
4946
private final FlagSyncServiceBlockingStub serviceBlockingStub;
@@ -69,7 +66,7 @@ public GrpcStreamConnector(final FlagdOptions options) {
6966
public void init() {
7067
Thread listener = new Thread(() -> {
7168
try {
72-
observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector);
69+
observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector, deadline);
7370
} catch (InterruptedException e) {
7471
log.warn("gRPC event stream interrupted, flag configurations are stale", e);
7572
Thread.currentThread().interrupt();
@@ -118,7 +115,8 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
118115
final AtomicBoolean shutdown,
119116
final FlagSyncServiceStub serviceStub,
120117
final FlagSyncServiceBlockingStub serviceBlockingStub,
121-
final String selector)
118+
final String selector,
119+
final int deadline)
122120
throws InterruptedException {
123121

124122
final BlockingQueue<GrpcResponseModel> streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE);
@@ -137,9 +135,11 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
137135
syncRequest.setSelector(selector);
138136
}
139137

140-
serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));
138+
serviceStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).syncFlags(syncRequest.build(),
139+
new GrpcStreamHandler(streamReceiver));
141140
try {
142-
GetMetadataResponse metadataResponse = serviceBlockingStub.getMetadata(metadataRequest.build());
141+
GetMetadataResponse metadataResponse = serviceBlockingStub
142+
.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).getMetadata(metadataRequest.build());
143143
metadata = convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap()).asObjectMap();
144144
} catch (Exception e) {
145145
// the chances this call fails but the syncRequest does not are slim

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;
22

3-
import static org.junit.Assert.assertThat;
43
import static org.junit.jupiter.api.Assertions.assertEquals;
54
import static org.junit.jupiter.api.Assertions.assertNotNull;
65
import static org.junit.jupiter.api.Assertions.assertNull;
76
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
87
import static org.junit.jupiter.api.Assertions.assertTrue;
98
import static org.mockito.ArgumentMatchers.any;
9+
import static org.mockito.ArgumentMatchers.anyLong;
10+
import static org.mockito.Mockito.doAnswer;
11+
import static org.mockito.Mockito.mock;
12+
import static org.mockito.Mockito.timeout;
1013
import static org.mockito.Mockito.times;
1114
import static org.mockito.Mockito.verify;
1215
import static org.mockito.Mockito.when;
@@ -17,7 +20,6 @@
1720
import java.util.concurrent.TimeUnit;
1821

1922
import org.junit.jupiter.api.Test;
20-
import org.mockito.Mockito;
2123

2224
import com.google.protobuf.Struct;
2325

@@ -39,21 +41,25 @@ public void connectionParameters() throws Throwable {
3941
// given
4042
final FlagdOptions options = FlagdOptions.builder()
4143
.selector("selector")
44+
.deadline(1337)
4245
.build();
4346

4447
final GrpcStreamConnector connector = new GrpcStreamConnector(options);
4548
final FlagSyncServiceStub stubMock = mockStubAndReturn(connector);
46-
49+
final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector);
4750
final SyncFlagsRequest[] request = new SyncFlagsRequest[1];
4851

49-
Mockito.doAnswer(invocation -> {
52+
when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock);
53+
doAnswer(invocation -> {
5054
request[0] = invocation.getArgument(0, SyncFlagsRequest.class);
5155
return null;
5256
}).when(stubMock).syncFlags(any(), any());
5357

5458
// when
5559
connector.init();
56-
verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
60+
verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
61+
verify(stubMock).withDeadlineAfter(1337, TimeUnit.MILLISECONDS);
62+
verify(blockingStubMock).withDeadlineAfter(1337, TimeUnit.MILLISECONDS);
5763

5864
// then
5965
final SyncFlagsRequest flagsRequest = request[0];
@@ -70,25 +76,27 @@ public void grpcConnectionStatus() throws Throwable {
7076
final FlagSyncServiceStub stubMock = mockStubAndReturn(connector);
7177
final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector);
7278

73-
final Struct metadata = Struct.newBuilder()
74-
.putFields(key,
75-
com.google.protobuf.Value.newBuilder().setStringValue(val).build())
76-
.build();
77-
79+
final Struct metadata = Struct.newBuilder()
80+
.putFields(key,
81+
com.google.protobuf.Value.newBuilder().setStringValue(val).build())
82+
.build();
7883

79-
when(blockingStubMock.getMetadata(any())).thenReturn(GetMetadataResponse.newBuilder().setMetadata(metadata).build());
84+
when(blockingStubMock.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStubMock);
85+
when(blockingStubMock.getMetadata(any()))
86+
.thenReturn(GetMetadataResponse.newBuilder().setMetadata(metadata).build());
8087

8188
final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1];
8289

83-
Mockito.doAnswer(invocation -> {
90+
when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock);
91+
doAnswer(invocation -> {
8492
injectedHandler[0] = invocation.getArgument(1, GrpcStreamHandler.class);
8593
return null;
8694
}).when(stubMock).syncFlags(any(), any());
8795

8896
// when
8997
connector.init();
9098
// verify and wait for initialization
91-
verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
99+
verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
92100
verify(blockingStubMock).getMetadata(any());
93101

94102
// then
@@ -133,15 +141,17 @@ public void listenerExitOnShutdown() throws Throwable {
133141

134142
final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1];
135143

136-
Mockito.doAnswer(invocation -> {
144+
when(blockingStubMock.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStubMock);
145+
when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock);
146+
doAnswer(invocation -> {
137147
injectedHandler[0] = invocation.getArgument(1, GrpcStreamHandler.class);
138148
return null;
139149
}).when(stubMock).syncFlags(any(), any());
140150

141151
// when
142152
connector.init();
143153
// verify and wait for initialization
144-
verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
154+
verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
145155
verify(blockingStubMock).getMetadata(any());
146156

147157
// then
@@ -176,23 +186,23 @@ private static FlagSyncServiceStub mockStubAndReturn(final GrpcStreamConnector c
176186
final Field serviceStubField = GrpcStreamConnector.class.getDeclaredField("serviceStub");
177187
serviceStubField.setAccessible(true);
178188

179-
final FlagSyncServiceStub stubMock = Mockito.mock(FlagSyncServiceStub.class);
189+
final FlagSyncServiceStub stubMock = mock(FlagSyncServiceStub.class);
180190

181191
serviceStubField.set(connector, stubMock);
182192

183193
return stubMock;
184194
}
185195

186196
private static FlagSyncServiceBlockingStub mockBlockingStubAndReturn(final GrpcStreamConnector connector)
187-
throws Throwable {
188-
final Field blockingStubField = GrpcStreamConnector.class.getDeclaredField("serviceBlockingStub");
189-
blockingStubField.setAccessible(true);
197+
throws Throwable {
198+
final Field blockingStubField = GrpcStreamConnector.class.getDeclaredField("serviceBlockingStub");
199+
blockingStubField.setAccessible(true);
190200

191-
final FlagSyncServiceBlockingStub blockingStubMock = Mockito.mock(FlagSyncServiceBlockingStub.class);
201+
final FlagSyncServiceBlockingStub blockingStubMock = mock(FlagSyncServiceBlockingStub.class);
192202

193-
blockingStubField.set(connector, blockingStubMock);
203+
blockingStubField.set(connector, blockingStubMock);
194204

195-
return blockingStubMock;
205+
return blockingStubMock;
196206
}
197207

198208
}

0 commit comments

Comments
 (0)