11package dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .grpc ;
22
33import dev .openfeature .contrib .providers .flagd .FlagdOptions ;
4- import dev .openfeature .contrib .providers .flagd .resolver .common .ChannelBuilder ;
5- import dev .openfeature .contrib .providers .flagd .resolver .common . backoff . GrpcStreamConnectorBackoffService ;
4+ import dev .openfeature .contrib .providers .flagd .resolver .common .ConnectionEvent ;
5+ import dev .openfeature .contrib .providers .flagd .resolver .grpc . GrpcConnector ;
66import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .Connector ;
77import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .QueuePayload ;
88import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .QueuePayloadType ;
99import dev .openfeature .flagd .grpc .sync .FlagSyncServiceGrpc ;
10- import dev .openfeature .flagd .grpc .sync .FlagSyncServiceGrpc .FlagSyncServiceBlockingStub ;
11- import dev .openfeature .flagd .grpc .sync .FlagSyncServiceGrpc .FlagSyncServiceStub ;
1210import dev .openfeature .flagd .grpc .sync .Sync .GetMetadataRequest ;
1311import dev .openfeature .flagd .grpc .sync .Sync .GetMetadataResponse ;
1412import dev .openfeature .flagd .grpc .sync .Sync .SyncFlagsRequest ;
1513import dev .openfeature .flagd .grpc .sync .Sync .SyncFlagsResponse ;
1614import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
1715import io .grpc .Context ;
1816import io .grpc .Context .CancellableContext ;
19- import io .grpc .ManagedChannel ;
2017import java .util .concurrent .BlockingQueue ;
2118import java .util .concurrent .LinkedBlockingQueue ;
2219import java .util .concurrent .TimeUnit ;
2320import java .util .concurrent .atomic .AtomicBoolean ;
21+ import java .util .function .Consumer ;
2422import lombok .extern .slf4j .Slf4j ;
25- import org .slf4j .event .Level ;
2623
2724/**
2825 * Implements the {@link Connector} contract and emit flags obtained from flagd sync gRPC contract.
@@ -36,42 +33,34 @@ public class GrpcStreamConnector implements Connector {
3633
3734 private final AtomicBoolean shutdown = new AtomicBoolean (false );
3835 private final BlockingQueue <QueuePayload > blockingQueue = new LinkedBlockingQueue <>(QUEUE_SIZE );
39- private final ManagedChannel channel ;
40- private final FlagSyncServiceStub serviceStub ;
41- private final FlagSyncServiceBlockingStub serviceBlockingStub ;
4236 private final int deadline ;
43- private final int streamDeadlineMs ;
4437 private final String selector ;
45- private final int retryBackoffMillis ;
38+ private final GrpcConnector <
39+ FlagSyncServiceGrpc .FlagSyncServiceStub , FlagSyncServiceGrpc .FlagSyncServiceBlockingStub >
40+ grpcConnector ;
41+ private final LinkedBlockingQueue <GrpcResponseModel > streamReceiver ;
4642
4743 /**
48- * Construct a new GrpcStreamConnector.
49- *
50- * @param options flagd options
44+ * Creates a new GrpcStreamConnector responsible for observing the event stream
5145 */
52- public GrpcStreamConnector (final FlagdOptions options ) {
53- channel = ChannelBuilder .nettyChannel (options );
54- serviceStub = FlagSyncServiceGrpc .newStub (channel );
55- serviceBlockingStub = FlagSyncServiceGrpc .newBlockingStub (channel );
46+ public GrpcStreamConnector (final FlagdOptions options , Consumer <ConnectionEvent > onConnectionEvent ) {
5647 deadline = options .getDeadline ();
57- streamDeadlineMs = options .getStreamDeadlineMs ();
5848 selector = options .getSelector ();
59- retryBackoffMillis = options .getRetryBackoffMs ();
49+ streamReceiver = new LinkedBlockingQueue <>(QUEUE_SIZE );
50+ grpcConnector = new GrpcConnector <>(
51+ options ,
52+ FlagSyncServiceGrpc ::newStub ,
53+ FlagSyncServiceGrpc ::newBlockingStub ,
54+ onConnectionEvent ,
55+ stub -> stub .syncFlags (SyncFlagsRequest .getDefaultInstance (), new GrpcStreamHandler (streamReceiver )));
6056 }
6157
6258 /** Initialize gRPC stream connector. */
63- public void init () {
59+ public void init () throws Exception {
60+ grpcConnector .initialize ();
6461 Thread listener = new Thread (() -> {
6562 try {
66- observeEventStream (
67- blockingQueue ,
68- shutdown ,
69- serviceStub ,
70- serviceBlockingStub ,
71- selector ,
72- deadline ,
73- streamDeadlineMs ,
74- retryBackoffMillis );
63+ observeEventStream (blockingQueue , shutdown , selector , deadline );
7564 } catch (InterruptedException e ) {
7665 log .warn ("gRPC event stream interrupted, flag configurations are stale" , e );
7766 Thread .currentThread ().interrupt ();
@@ -96,37 +85,17 @@ public void shutdown() throws InterruptedException {
9685 if (shutdown .getAndSet (true )) {
9786 return ;
9887 }
99-
100- try {
101- if (this .channel != null && !this .channel .isShutdown ()) {
102- this .channel .shutdown ();
103- this .channel .awaitTermination (this .deadline , TimeUnit .MILLISECONDS );
104- }
105- } finally {
106- if (this .channel != null && !this .channel .isShutdown ()) {
107- this .channel .shutdownNow ();
108- this .channel .awaitTermination (this .deadline , TimeUnit .MILLISECONDS );
109- log .warn (String .format ("Unable to shut down channel by %d deadline" , this .deadline ));
110- }
111- }
88+ this .grpcConnector .shutdown ();
11289 }
11390
11491 /** Contains blocking calls, to be used concurrently. */
115- static void observeEventStream (
92+ void observeEventStream (
11693 final BlockingQueue <QueuePayload > writeTo ,
11794 final AtomicBoolean shutdown ,
118- final FlagSyncServiceStub serviceStub ,
119- final FlagSyncServiceBlockingStub serviceBlockingStub ,
12095 final String selector ,
121- final int deadline ,
122- final int streamDeadlineMs ,
123- int retryBackoffMillis )
96+ final int deadline )
12497 throws InterruptedException {
12598
126- final BlockingQueue <GrpcResponseModel > streamReceiver = new LinkedBlockingQueue <>(QUEUE_SIZE );
127- final GrpcStreamConnectorBackoffService backoffService =
128- new GrpcStreamConnectorBackoffService (retryBackoffMillis );
129-
13099 log .info ("Initializing sync stream observer" );
131100
132101 while (!shutdown .get ()) {
@@ -143,15 +112,10 @@ static void observeEventStream(
143112 }
144113
145114 try (CancellableContext context = Context .current ().withCancellation ()) {
146- FlagSyncServiceStub localServiceStub = serviceStub ;
147- if (streamDeadlineMs > 0 ) {
148- localServiceStub = localServiceStub .withDeadlineAfter (streamDeadlineMs , TimeUnit .MILLISECONDS );
149- }
150-
151- localServiceStub .syncFlags (syncRequest .build (), new GrpcStreamHandler (streamReceiver ));
152115
153116 try {
154- metadataResponse = serviceBlockingStub
117+ metadataResponse = grpcConnector
118+ .getResolver ()
155119 .withDeadlineAfter (deadline , TimeUnit .MILLISECONDS )
156120 .getMetadata (metadataRequest .build ());
157121 } catch (Exception e ) {
@@ -164,27 +128,18 @@ static void observeEventStream(
164128
165129 while (!shutdown .get ()) {
166130 final GrpcResponseModel response = streamReceiver .take ();
167-
168131 if (response .isComplete ()) {
169132 log .info ("Sync stream completed" );
170- // The stream is complete, this isn't really an error but we should try to
133+ // The stream is complete, this isn't really an error, but we should try to
171134 // reconnect
172135 break ;
173136 }
174137
175138 Throwable streamException = response .getError ();
176139 if (streamException != null || metadataException != null ) {
177- long retryDelay = backoffService .getCurrentBackoffMillis ();
178-
179- // if we are in silent recover mode, we should not expose the error to the client
180- if (backoffService .shouldRetrySilently ()) {
181- logExceptions (Level .INFO , streamException , metadataException , retryDelay );
182- } else {
183- logExceptions (Level .ERROR , streamException , metadataException , retryDelay );
184- if (!writeTo .offer (new QueuePayload (
185- QueuePayloadType .ERROR , "Error from stream or metadata" , metadataResponse ))) {
186- log .error ("Failed to convey ERROR status, queue is full" );
187- }
140+ if (!writeTo .offer (new QueuePayload (
141+ QueuePayloadType .ERROR , "Error from stream or metadata" , metadataResponse ))) {
142+ log .error ("Failed to convey ERROR status, queue is full" );
188143 }
189144
190145 // close the context to cancel the stream in case just the metadata call failed
@@ -199,34 +154,10 @@ static void observeEventStream(
199154 if (!writeTo .offer (new QueuePayload (QueuePayloadType .DATA , data , metadataResponse ))) {
200155 log .error ("Stream writing failed" );
201156 }
202-
203- // reset backoff if we succeeded in a retry attempt
204- backoffService .reset ();
205157 }
206158 }
207-
208- // check for shutdown and avoid sleep
209- if (!shutdown .get ()) {
210- log .debug ("Stream failed, retrying in {}ms" , backoffService .getCurrentBackoffMillis ());
211- backoffService .waitUntilNextAttempt ();
212- }
213159 }
214160
215161 log .info ("Shutdown invoked, exiting event stream listener" );
216162 }
217-
218- private static void logExceptions (
219- Level logLevel , Throwable streamException , Exception metadataException , long retryDelay ) {
220- if (streamException != null ) {
221- log .atLevel (logLevel )
222- .setCause (streamException )
223- .log ("Error initializing stream, retrying in {}ms" , retryDelay );
224- }
225-
226- if (metadataException != null ) {
227- log .atLevel (logLevel )
228- .setCause (metadataException )
229- .log ("Error initializing metadata, retrying in {}ms" , retryDelay );
230- }
231- }
232163}
0 commit comments