@@ -79,15 +79,7 @@ protected SyncStreamQueueSource(
7979 /** Initialize sync stream connector. */
8080 public void init () throws Exception {
8181 channelConnector .initialize ();
82- Thread listener = new Thread (() -> {
83- try {
84- observeSyncStream ();
85- } catch (InterruptedException e ) {
86- log .warn ("gRPC event stream interrupted, flag configurations are stale" , e );
87- Thread .currentThread ().interrupt ();
88- }
89- });
90-
82+ Thread listener = new Thread (this ::observeSyncStream );
9183 listener .setDaemon (true );
9284 listener .start ();
9385 }
@@ -110,7 +102,7 @@ public void shutdown() throws InterruptedException {
110102 }
111103
112104 /** Contains blocking calls, to be used concurrently. */
113- private void observeSyncStream () throws InterruptedException {
105+ private void observeSyncStream () {
114106 log .info ("Initializing sync stream observer" );
115107
116108 // outer loop for re-issuing the stream request
@@ -123,14 +115,17 @@ private void observeSyncStream() throws InterruptedException {
123115 observer .metadata = getMetadata ();
124116 } catch (Exception metaEx ) {
125117 // retry if getMetadata fails
126- log .warn ("Metadata request exception, retrying." , metaEx );
118+ String message = metaEx .getMessage ();
119+ log .debug ("Metadata request error: {}, will restart" , message , metaEx );
120+ enqueueError (String .format ("Error in getMetadata request: %s" , message ));
127121 continue ;
128122 }
129123
130124 try {
131125 syncFlags (observer );
132126 } catch (Exception ex ) {
133- log .warn ("Sync stream exception, retrying." , ex );
127+ log .error ("Unexpected sync stream exception, will restart." , ex );
128+ enqueueError (String .format ("Error in getMetadata request: %s" , ex .getMessage ()));
134129 }
135130 }
136131
@@ -178,6 +173,17 @@ private void syncFlags(SyncStreamObserver streamObserver) {
178173 streamObserver .done .await ();
179174 }
180175
176+ private void enqueueError (String message ) {
177+ enqueueError (outgoingQueue , message );
178+ }
179+
180+ private static void enqueueError (BlockingQueue <QueuePayload > queue , String message ) {
181+ if (!queue .offer (new QueuePayload (
182+ QueuePayloadType .ERROR , message , null ))) {
183+ log .error ("Failed to convey ERROR status, queue is full" );
184+ }
185+ }
186+
181187 private static class SyncStreamObserver implements StreamObserver <SyncFlagsResponse > {
182188 private final BlockingQueue <QueuePayload > outgoingQueue ;
183189 private final Awaitable done = new Awaitable ();
@@ -205,10 +211,7 @@ public void onError(Throwable throwable) {
205211 try {
206212 String message = throwable != null ? throwable .getMessage () : "unknown" ;
207213 log .debug ("Stream error: {}, will restart" , message , throwable );
208- if (!outgoingQueue .offer (new QueuePayload (
209- QueuePayloadType .ERROR , String .format ("Error from stream: %s" , message ), null ))) {
210- log .error ("Failed to convey ERROR status, queue is full" );
211- }
214+ enqueueError (outgoingQueue , String .format ("Error from stream: %s" , message ));
212215 } finally {
213216 done .wakeup ();
214217 }
0 commit comments