1414import dev .openfeature .flagd .grpc .sync .Sync .GetMetadataResponse ;
1515import dev .openfeature .flagd .grpc .sync .Sync .SyncFlagsRequest ;
1616import dev .openfeature .flagd .grpc .sync .Sync .SyncFlagsResponse ;
17+ import dev .openfeature .sdk .Awaitable ;
1718import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
18- import io .grpc .Context ;
1919import io .grpc .stub .StreamObserver ;
2020import java .util .concurrent .BlockingQueue ;
2121import java .util .concurrent .LinkedBlockingQueue ;
@@ -117,55 +117,48 @@ private void observeSyncStream() throws InterruptedException {
117117 // "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions
118118 while (!shutdown .get ()) {
119119 log .debug ("Initializing sync stream request" );
120+ SyncStreamObserver observer = new SyncStreamObserver (outgoingQueue );
120121
121- try ( Context . CancellableContext context = Context . current (). withCancellation ()) {
122- SyncStreamObserver observer = new SyncStreamObserver ( outgoingQueue , context );
123- Context previous = context . attach ();
124- try {
125- if (! syncMetadataDisabled ) {
126- observer . metadata = getMetadata ( context ) ;
127- }
128-
129- syncFlags ( observer );
130- } finally {
131- context . detach ( previous );
132- }
122+ try {
123+ observer . metadata = getMetadata ( );
124+ } catch ( Exception metaEx ) {
125+ // retry if getMetadata fails
126+ log . warn ( "Metadata request exception, retrying." , metaEx );
127+ continue ;
128+ }
129+
130+ try {
131+ syncFlags ( observer );
132+ } catch ( Exception ex ) {
133+ log . warn ( "Sync stream exception, retrying." , ex );
133134 }
134135 }
135136
136137 log .info ("Shutdown invoked, exiting event stream listener" );
137138 }
138139
139140 // TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
140- private Struct getMetadata (Context .CancellableContext context ) {
141- try {
142- FlagSyncServiceBlockingStub localStub = blockingStub ;
141+ private Struct getMetadata () {
142+ if (syncMetadataDisabled ) {
143+ return null ;
144+ }
143145
144- if (deadline > 0 ) {
145- localStub = localStub .withDeadlineAfter (deadline , TimeUnit .MILLISECONDS );
146- }
146+ FlagSyncServiceBlockingStub localStub = blockingStub ;
147147
148- GetMetadataResponse metadataResponse = localStub .getMetadata (GetMetadataRequest .getDefaultInstance ());
148+ if (deadline > 0 ) {
149+ localStub = localStub .withDeadlineAfter (deadline , TimeUnit .MILLISECONDS );
150+ }
149151
150- if (metadataResponse != null ) {
151- return metadataResponse .getMetadata ();
152- }
153- } catch (Exception metaEx ) {
154- // retry getMetadata fails
155- // we can keep this log quiet since the stream is cancelled/restarted with this exception
156- log .debug ("Metadata exception: {}, retrying" , metaEx .getMessage (), metaEx );
157- context .cancel (metaEx );
152+ GetMetadataResponse metadataResponse = localStub .getMetadata (GetMetadataRequest .getDefaultInstance ());
153+
154+ if (metadataResponse != null ) {
155+ return metadataResponse .getMetadata ();
158156 }
159157
160158 return null ;
161159 }
162160
163161 private void syncFlags (SyncStreamObserver streamObserver ) {
164- // No need to fire a sync request if the context is already cancelled
165- if (streamObserver .context .isCancelled ()) {
166- return ;
167- }
168-
169162 FlagSyncServiceStub localStub = stub ; // don't mutate the stub
170163 if (streamDeadline > 0 ) {
171164 localStub = localStub .withDeadlineAfter (streamDeadline , TimeUnit .MILLISECONDS );
@@ -182,29 +175,17 @@ private void syncFlags(SyncStreamObserver streamObserver) {
182175
183176 localStub .syncFlags (syncRequest .build (), streamObserver );
184177
185- while (!streamObserver .context .isCancelled ()) {
186- if (shutdown .get ()) {
187- streamObserver .context .cancel (null );
188- }
189-
190- try {
191- Thread .sleep (500 );
192- } catch (InterruptedException e ) {
193- log .warn ("Sync stream interrupted, will restart" , e );
194- streamObserver .context .cancel (e );
195- Thread .currentThread ().interrupt ();
196- }
197- }
178+ streamObserver .done .await ();
198179 }
199180
200181 private static class SyncStreamObserver implements StreamObserver <SyncFlagsResponse > {
201182 private final BlockingQueue <QueuePayload > outgoingQueue ;
202- private final Context .CancellableContext context ;
183+ private final Awaitable done = new Awaitable ();
184+
203185 private Struct metadata ;
204186
205- public SyncStreamObserver (BlockingQueue <QueuePayload > outgoingQueue , Context . CancellableContext context ) {
187+ public SyncStreamObserver (BlockingQueue <QueuePayload > outgoingQueue ) {
206188 this .outgoingQueue = outgoingQueue ;
207- this .context = context ;
208189 }
209190
210191 @ Override
@@ -229,14 +210,14 @@ public void onError(Throwable throwable) {
229210 log .error ("Failed to convey ERROR status, queue is full" );
230211 }
231212 } finally {
232- context . cancel ( throwable );
213+ done . wakeup ( );
233214 }
234215 }
235216
236217 @ Override
237218 public void onCompleted () {
238219 log .debug ("Sync stream completed, will restart" );
239- context . cancel ( null );
220+ done . wakeup ( );
240221 }
241222 }
242223}
0 commit comments