2020import java .util .concurrent .atomic .AtomicBoolean ;
2121
2222/**
23- * Implements the {@link Connector} contract and emit flags obtained from flagd sync gRPC contract.
23+ * Implements the {@link Connector} contract and emit flags obtained from flagd
24+ * sync gRPC contract.
2425 */
2526@ Slf4j
26- @ SuppressFBWarnings (value = {"PREDICTABLE_RANDOM" , "EI_EXPOSE_REP" } ,
27- justification = "Random is used to generate a variation & flag configurations require exposing" )
27+ @ SuppressFBWarnings (value = { "PREDICTABLE_RANDOM" ,
28+ "EI_EXPOSE_REP" }, justification = "Random is used to generate a variation & flag configurations require exposing" )
2829public class GrpcStreamConnector implements Connector {
2930 private static final Random RANDOM = new Random ();
3031
@@ -111,40 +112,47 @@ public void shutdown() throws InterruptedException {
111112 * Contains blocking calls, to be used concurrently.
112113 */
113114 static void observeEventStream (final BlockingQueue <StreamPayload > writeTo ,
114- final AtomicBoolean shutdown ,
115- final FlagSyncServiceStub serviceStub ,
116- final SyncFlagsRequest request )
115+ final AtomicBoolean shutdown ,
116+ final FlagSyncServiceStub serviceStub ,
117+ final SyncFlagsRequest request )
117118 throws InterruptedException {
118119
119120 final BlockingQueue <GrpcResponseModel > streamReceiver = new LinkedBlockingQueue <>(QUEUE_SIZE );
120121 int retryDelay = INIT_BACK_OFF ;
121122
123+ log .info ("Initializing sync stream observer" );
124+
122125 while (!shutdown .get ()) {
126+ log .debug ("Initializing sync stream request" );
123127 serviceStub .syncFlags (request , new GrpcStreamHandler (streamReceiver ));
124128
125129 while (!shutdown .get ()) {
126130 final GrpcResponseModel response = streamReceiver .take ();
127131
128132 if (response .isComplete ()) {
129- // The stream is complete. This is not considered as an error
133+ log .info ("Sync stream completed" );
134+ // The stream is complete, this isn't really an error but we should try to
135+ // reconnect
130136 break ;
131137 }
132138
133139 if (response .getError () != null ) {
134- log .warn (String .format ("Error from grpc connection, retrying in %dms" , retryDelay ),
140+ log .error (String .format ("Error from grpc connection, retrying in %dms" , retryDelay ),
135141 response .getError ());
136142
137143 if (!writeTo .offer (
138144 new StreamPayload (StreamPayloadType .ERROR , "Error from stream connection, retrying" ))) {
139- log .warn ("Failed to convey ERROR satus , queue is full" );
145+ log .error ("Failed to convey ERROR status , queue is full" );
140146 }
141147 break ;
142148 }
143149
144150 final SyncFlagsResponse flagsResponse = response .getSyncFlagsResponse ();
151+ String data = flagsResponse .getFlagConfiguration ();
152+ log .debug ("Got stream response: " + data );
145153 if (!writeTo .offer (
146- new StreamPayload (StreamPayloadType .DATA , flagsResponse . getFlagConfiguration () ))) {
147- log .warn ("Stream writing failed" );
154+ new StreamPayload (StreamPayloadType .DATA , data ))) {
155+ log .error ("Stream writing failed" );
148156 }
149157
150158 // reset retry delay if we succeeded in a retry attempt
@@ -158,6 +166,7 @@ static void observeEventStream(final BlockingQueue<StreamPayload> writeTo,
158166 }
159167
160168 // busy wait till next attempt
169+ log .warn (String .format ("Stream failed, retrying in %dms" , retryDelay ));
161170 Thread .sleep (retryDelay + RANDOM .nextInt (INIT_BACK_OFF ));
162171
163172 if (retryDelay < MAX_BACK_OFF ) {
0 commit comments