2323import dev .openfeature .flagd .grpc .sync .Sync .GetMetadataResponse ;
2424import dev .openfeature .flagd .grpc .sync .Sync .SyncFlagsRequest ;
2525import dev .openfeature .flagd .grpc .sync .Sync .SyncFlagsResponse ;
26+ import io .grpc .StatusRuntimeException ;
2627import io .grpc .stub .StreamObserver ;
2728import java .util .concurrent .BlockingQueue ;
2829import java .util .concurrent .CountDownLatch ;
@@ -35,11 +36,12 @@ class SyncStreamQueueSourceTest {
3536 private ChannelConnector mockConnector ;
3637 private FlagSyncServiceBlockingStub blockingStub ;
3738 private FlagSyncServiceStub stub ;
39+ private FlagSyncServiceStub errorStub ;
3840 private StreamObserver <SyncFlagsResponse > observer ;
3941 private CountDownLatch latch ; // used to wait for observer to be initialized
4042
4143 @ BeforeEach
42- public void init () throws Exception {
44+ public void setup () throws Exception {
4345 blockingStub = mock (FlagSyncServiceBlockingStub .class );
4446 when (blockingStub .withDeadlineAfter (anyLong (), any ())).thenReturn (blockingStub );
4547 when (blockingStub .getMetadata (any ())).thenReturn (GetMetadataResponse .getDefaultInstance ());
@@ -57,22 +59,53 @@ public void init() throws Exception {
5759 })
5860 .when (stub )
5961 .syncFlags (any (SyncFlagsRequest .class ), any (StreamObserver .class )); // Mock the initialize
60- // method
62+
63+ errorStub = mock (FlagSyncServiceStub .class );
64+ when (errorStub .withDeadlineAfter (anyLong (), any ())).thenReturn (errorStub );
65+ doAnswer ((Answer <Void >) invocation -> {
66+ Object [] args = invocation .getArguments ();
67+ observer = (StreamObserver <SyncFlagsResponse >) args [1 ];
68+ latch .countDown ();
69+ throw new StatusRuntimeException (io .grpc .Status .NOT_FOUND );
70+ })
71+ .when (errorStub )
72+ .syncFlags (any (SyncFlagsRequest .class ), any (StreamObserver .class )); // Mock the initialize
6173 }
6274
75+ // @Test
76+ // void initError_DoesNotBusyWait() throws Exception {
77+ // // make sure we do not spin in a busy loop on errors
78+
79+ // SyncStreamQueueSource queueSource =
80+ // new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, errorStub, blockingStub);
81+ // latch = new CountDownLatch(1);
82+ // queueSource.init();
83+ // latch.await();
84+
85+ // BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
86+ // QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
87+ // assertNotNull(payload);
88+ // assertEquals(QueuePayloadType.ERROR, payload.getType());
89+ // Thread.sleep(SyncStreamQueueSource.RETRY_LOOP_DELAY_MS + 1000); // wait for retries
90+
91+ // // should have retried the stream (2 calls); initial + 1 retry
92+ // // it's very important that the retry count is low, to confirm no busy-loop
93+ // verify(errorStub, times(2)).syncFlags(any(), any());
94+ // }
95+
6396 @ Test
6497 void onNextEnqueuesDataPayload () throws Exception {
65- SyncStreamQueueSource connector =
98+ SyncStreamQueueSource queueSource =
6699 new SyncStreamQueueSource (FlagdOptions .builder ().build (), mockConnector , stub , blockingStub );
67100 latch = new CountDownLatch (1 );
68- connector .init ();
101+ queueSource .init ();
69102 latch .await ();
70103
71104 // fire onNext (data) event
72105 observer .onNext (SyncFlagsResponse .newBuilder ().build ());
73106
74107 // should enqueue data payload
75- BlockingQueue <QueuePayload > streamQueue = connector .getStreamQueue ();
108+ BlockingQueue <QueuePayload > streamQueue = queueSource .getStreamQueue ();
76109 QueuePayload payload = streamQueue .poll (1000 , TimeUnit .MILLISECONDS );
77110 assertNotNull (payload );
78111 assertNotNull (payload .getSyncContext ());
@@ -81,20 +114,21 @@ void onNextEnqueuesDataPayload() throws Exception {
81114 verify (stub , times (1 )).syncFlags (any (), any ());
82115 }
83116
117+
84118 @ Test
85119 void onNextEnqueuesDataPayloadMetadataDisabled () throws Exception {
86120 // disable GetMetadata call
87- SyncStreamQueueSource connector = new SyncStreamQueueSource (
121+ SyncStreamQueueSource queueSource = new SyncStreamQueueSource (
88122 FlagdOptions .builder ().syncMetadataDisabled (true ).build (), mockConnector , stub , blockingStub );
89123 latch = new CountDownLatch (1 );
90- connector .init ();
124+ queueSource .init ();
91125 latch .await ();
92126
93127 // fire onNext (data) event
94128 observer .onNext (SyncFlagsResponse .newBuilder ().build ());
95129
96130 // should enqueue data payload
97- BlockingQueue <QueuePayload > streamQueue = connector .getStreamQueue ();
131+ BlockingQueue <QueuePayload > streamQueue = queueSource .getStreamQueue ();
98132 QueuePayload payload = streamQueue .poll (1000 , TimeUnit .MILLISECONDS );
99133 assertNotNull (payload );
100134 assertNull (payload .getSyncContext ());
@@ -108,10 +142,10 @@ void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception {
108142 @ Test
109143 void onNextEnqueuesDataPayloadWithSyncContext () throws Exception {
110144 // disable GetMetadata call
111- SyncStreamQueueSource connector =
145+ SyncStreamQueueSource queueSource =
112146 new SyncStreamQueueSource (FlagdOptions .builder ().build (), mockConnector , stub , blockingStub );
113147 latch = new CountDownLatch (1 );
114- connector .init ();
148+ queueSource .init ();
115149 latch .await ();
116150
117151 // fire onNext (data) event
@@ -120,7 +154,7 @@ void onNextEnqueuesDataPayloadWithSyncContext() throws Exception {
120154 SyncFlagsResponse .newBuilder ().setSyncContext (syncContext ).build ());
121155
122156 // should enqueue data payload
123- BlockingQueue <QueuePayload > streamQueue = connector .getStreamQueue ();
157+ BlockingQueue <QueuePayload > streamQueue = queueSource .getStreamQueue ();
124158 QueuePayload payload = streamQueue .poll (1000 , TimeUnit .MILLISECONDS );
125159 assertNotNull (payload );
126160 assertEquals (syncContext , payload .getSyncContext ());
@@ -131,18 +165,18 @@ void onNextEnqueuesDataPayloadWithSyncContext() throws Exception {
131165
132166 @ Test
133167 void onErrorEnqueuesDataPayload () throws Exception {
134- SyncStreamQueueSource connector =
168+ SyncStreamQueueSource queueSource =
135169 new SyncStreamQueueSource (FlagdOptions .builder ().build (), mockConnector , stub , blockingStub );
136170 latch = new CountDownLatch (1 );
137- connector .init ();
171+ queueSource .init ();
138172 latch .await ();
139173
140174 // fire onError event and reset latch
141175 latch = new CountDownLatch (1 );
142176 observer .onError (new Exception ("fake exception" ));
143177
144178 // should enqueue error payload
145- BlockingQueue <QueuePayload > streamQueue = connector .getStreamQueue ();
179+ BlockingQueue <QueuePayload > streamQueue = queueSource .getStreamQueue ();
146180 QueuePayload payload = streamQueue .poll (1000 , TimeUnit .MILLISECONDS );
147181 assertNotNull (payload );
148182 assertEquals (QueuePayloadType .ERROR , payload .getType ());
@@ -153,18 +187,18 @@ void onErrorEnqueuesDataPayload() throws Exception {
153187
154188 @ Test
155189 void onCompletedEnqueuesDataPayload () throws Exception {
156- SyncStreamQueueSource connector =
190+ SyncStreamQueueSource queueSource =
157191 new SyncStreamQueueSource (FlagdOptions .builder ().build (), mockConnector , stub , blockingStub );
158192 latch = new CountDownLatch (1 );
159- connector .init ();
193+ queueSource .init ();
160194 latch .await ();
161195
162196 // fire onCompleted event (graceful stream end) and reset latch
163197 latch = new CountDownLatch (1 );
164198 observer .onCompleted ();
165199
166200 // should enqueue error payload
167- BlockingQueue <QueuePayload > streamQueue = connector .getStreamQueue ();
201+ BlockingQueue <QueuePayload > streamQueue = queueSource .getStreamQueue ();
168202 assertTrue (streamQueue .isEmpty ());
169203 // should have restarted the stream (2 calls)
170204 latch .await ();
0 commit comments