11package dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .grpc ;
22
33import dev .openfeature .contrib .providers .flagd .FlagdOptions ;
4- import dev .openfeature .contrib .providers .flagd .resolver .common .ChannelBuilder ;
5- import dev .openfeature .contrib .providers .flagd .resolver .common . backoff . GrpcStreamConnectorBackoffService ;
4+ import dev .openfeature .contrib .providers .flagd .resolver .common .ConnectionEvent ;
5+ import dev .openfeature .contrib .providers .flagd .resolver .grpc . GrpcConnector ;
66import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .Connector ;
77import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .QueuePayload ;
88import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .QueuePayloadType ;
99import dev .openfeature .flagd .grpc .sync .FlagSyncServiceGrpc ;
10- import dev .openfeature .flagd .grpc .sync .FlagSyncServiceGrpc .FlagSyncServiceBlockingStub ;
11- import dev .openfeature .flagd .grpc .sync .FlagSyncServiceGrpc .FlagSyncServiceStub ;
1210import dev .openfeature .flagd .grpc .sync .Sync .GetMetadataRequest ;
1311import dev .openfeature .flagd .grpc .sync .Sync .GetMetadataResponse ;
1412import dev .openfeature .flagd .grpc .sync .Sync .SyncFlagsRequest ;
1513import dev .openfeature .flagd .grpc .sync .Sync .SyncFlagsResponse ;
1614import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
1715import io .grpc .Context ;
1816import io .grpc .Context .CancellableContext ;
19- import io .grpc .ManagedChannel ;
2017import java .util .concurrent .BlockingQueue ;
2118import java .util .concurrent .LinkedBlockingQueue ;
2219import java .util .concurrent .TimeUnit ;
2320import java .util .concurrent .atomic .AtomicBoolean ;
21+ import java .util .function .Consumer ;
2422import lombok .extern .slf4j .Slf4j ;
25- import org .slf4j .event .Level ;
2623
2724/**
2825 * Implements the {@link Connector} contract and emit flags obtained from flagd sync gRPC contract.
@@ -36,42 +33,31 @@ public class GrpcStreamConnector implements Connector {
3633
3734 private final AtomicBoolean shutdown = new AtomicBoolean (false );
3835 private final BlockingQueue <QueuePayload > blockingQueue = new LinkedBlockingQueue <>(QUEUE_SIZE );
39- private final ManagedChannel channel ;
40- private final FlagSyncServiceStub serviceStub ;
41- private final FlagSyncServiceBlockingStub serviceBlockingStub ;
4236 private final int deadline ;
43- private final int streamDeadlineMs ;
4437 private final String selector ;
45- private final int retryBackoffMillis ;
38+ private final GrpcConnector <
39+ FlagSyncServiceGrpc .FlagSyncServiceStub , FlagSyncServiceGrpc .FlagSyncServiceBlockingStub >
40+ grpcConnector ;
41+ private final LinkedBlockingQueue <GrpcResponseModel > streamReceiver ;
4642
47- /**
48- * Construct a new GrpcStreamConnector.
49- *
50- * @param options flagd options
51- */
52- public GrpcStreamConnector (final FlagdOptions options ) {
53- channel = ChannelBuilder .nettyChannel (options );
54- serviceStub = FlagSyncServiceGrpc .newStub (channel );
55- serviceBlockingStub = FlagSyncServiceGrpc .newBlockingStub (channel );
43+ public GrpcStreamConnector (final FlagdOptions options , Consumer <ConnectionEvent > onConnectionEvent ) {
5644 deadline = options .getDeadline ();
57- streamDeadlineMs = options .getStreamDeadlineMs ();
5845 selector = options .getSelector ();
59- retryBackoffMillis = options .getRetryBackoffMs ();
46+ streamReceiver = new LinkedBlockingQueue <>(QUEUE_SIZE );
47+ grpcConnector = new GrpcConnector <>(
48+ options ,
49+ FlagSyncServiceGrpc ::newStub ,
50+ FlagSyncServiceGrpc ::newBlockingStub ,
51+ onConnectionEvent ,
52+ stub -> stub .syncFlags (SyncFlagsRequest .getDefaultInstance (), new GrpcStreamHandler (streamReceiver )));
6053 }
6154
6255 /** Initialize gRPC stream connector. */
63- public void init () {
56+ public void init () throws Exception {
57+ grpcConnector .initialize ();
6458 Thread listener = new Thread (() -> {
6559 try {
66- observeEventStream (
67- blockingQueue ,
68- shutdown ,
69- serviceStub ,
70- serviceBlockingStub ,
71- selector ,
72- deadline ,
73- streamDeadlineMs ,
74- retryBackoffMillis );
60+ observeEventStream (blockingQueue , shutdown , selector , deadline );
7561 } catch (InterruptedException e ) {
7662 log .warn ("gRPC event stream interrupted, flag configurations are stale" , e );
7763 Thread .currentThread ().interrupt ();
@@ -96,37 +82,17 @@ public void shutdown() throws InterruptedException {
9682 if (shutdown .getAndSet (true )) {
9783 return ;
9884 }
99-
100- try {
101- if (this .channel != null && !this .channel .isShutdown ()) {
102- this .channel .shutdown ();
103- this .channel .awaitTermination (this .deadline , TimeUnit .MILLISECONDS );
104- }
105- } finally {
106- if (this .channel != null && !this .channel .isShutdown ()) {
107- this .channel .shutdownNow ();
108- this .channel .awaitTermination (this .deadline , TimeUnit .MILLISECONDS );
109- log .warn (String .format ("Unable to shut down channel by %d deadline" , this .deadline ));
110- }
111- }
85+ this .grpcConnector .shutdown ();
11286 }
11387
11488 /** Contains blocking calls, to be used concurrently. */
115- static void observeEventStream (
89+ void observeEventStream (
11690 final BlockingQueue <QueuePayload > writeTo ,
11791 final AtomicBoolean shutdown ,
118- final FlagSyncServiceStub serviceStub ,
119- final FlagSyncServiceBlockingStub serviceBlockingStub ,
12092 final String selector ,
121- final int deadline ,
122- final int streamDeadlineMs ,
123- int retryBackoffMillis )
93+ final int deadline )
12494 throws InterruptedException {
12595
126- final BlockingQueue <GrpcResponseModel > streamReceiver = new LinkedBlockingQueue <>(QUEUE_SIZE );
127- final GrpcStreamConnectorBackoffService backoffService =
128- new GrpcStreamConnectorBackoffService (retryBackoffMillis );
129-
13096 log .info ("Initializing sync stream observer" );
13197
13298 while (!shutdown .get ()) {
@@ -143,15 +109,10 @@ static void observeEventStream(
143109 }
144110
145111 try (CancellableContext context = Context .current ().withCancellation ()) {
146- FlagSyncServiceStub localServiceStub = serviceStub ;
147- if (streamDeadlineMs > 0 ) {
148- localServiceStub = localServiceStub .withDeadlineAfter (streamDeadlineMs , TimeUnit .MILLISECONDS );
149- }
150-
151- localServiceStub .syncFlags (syncRequest .build (), new GrpcStreamHandler (streamReceiver ));
152112
153113 try {
154- metadataResponse = serviceBlockingStub
114+ metadataResponse = grpcConnector
115+ .getResolver ()
155116 .withDeadlineAfter (deadline , TimeUnit .MILLISECONDS )
156117 .getMetadata (metadataRequest .build ());
157118 } catch (Exception e ) {
@@ -164,27 +125,18 @@ static void observeEventStream(
164125
165126 while (!shutdown .get ()) {
166127 final GrpcResponseModel response = streamReceiver .take ();
167-
168128 if (response .isComplete ()) {
169129 log .info ("Sync stream completed" );
170- // The stream is complete, this isn't really an error but we should try to
130+ // The stream is complete, this isn't really an error, but we should try to
171131 // reconnect
172132 break ;
173133 }
174134
175135 Throwable streamException = response .getError ();
176136 if (streamException != null || metadataException != null ) {
177- long retryDelay = backoffService .getCurrentBackoffMillis ();
178-
179- // if we are in silent recover mode, we should not expose the error to the client
180- if (backoffService .shouldRetrySilently ()) {
181- logExceptions (Level .INFO , streamException , metadataException , retryDelay );
182- } else {
183- logExceptions (Level .ERROR , streamException , metadataException , retryDelay );
184- if (!writeTo .offer (new QueuePayload (
185- QueuePayloadType .ERROR , "Error from stream or metadata" , metadataResponse ))) {
186- log .error ("Failed to convey ERROR status, queue is full" );
187- }
137+ if (!writeTo .offer (new QueuePayload (
138+ QueuePayloadType .ERROR , "Error from stream or metadata" , metadataResponse ))) {
139+ log .error ("Failed to convey ERROR status, queue is full" );
188140 }
189141
190142 // close the context to cancel the stream in case just the metadata call failed
@@ -199,34 +151,10 @@ static void observeEventStream(
199151 if (!writeTo .offer (new QueuePayload (QueuePayloadType .DATA , data , metadataResponse ))) {
200152 log .error ("Stream writing failed" );
201153 }
202-
203- // reset backoff if we succeeded in a retry attempt
204- backoffService .reset ();
205154 }
206155 }
207-
208- // check for shutdown and avoid sleep
209- if (!shutdown .get ()) {
210- log .debug ("Stream failed, retrying in {}ms" , backoffService .getCurrentBackoffMillis ());
211- backoffService .waitUntilNextAttempt ();
212- }
213156 }
214157
215158 log .info ("Shutdown invoked, exiting event stream listener" );
216159 }
217-
218- private static void logExceptions (
219- Level logLevel , Throwable streamException , Exception metadataException , long retryDelay ) {
220- if (streamException != null ) {
221- log .atLevel (logLevel )
222- .setCause (streamException )
223- .log ("Error initializing stream, retrying in {}ms" , retryDelay );
224- }
225-
226- if (metadataException != null ) {
227- log .atLevel (logLevel )
228- .setCause (metadataException )
229- .log ("Error initializing metadata, retrying in {}ms" , retryDelay );
230- }
231- }
232160}
0 commit comments