Skip to content

Commit 4ce8f57

Browse files
committed
fix(flagd): Simplified ChannelConnector
Signed-off-by: Guido Breitenhuber <[email protected]>
1 parent 10bf99e commit 4ce8f57

File tree

6 files changed

+21
-37
lines changed

6 files changed

+21
-37
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,9 @@
1616
/**
1717
* A generic GRPC connector that manages connection states, reconnection logic, and event streaming for
1818
* GRPC services.
19-
*
20-
* @param <T> the type of the asynchronous stub for the GRPC service
21-
* @param <K> the type of the blocking stub for the GRPC service
2219
*/
2320
@Slf4j
24-
public class ChannelConnector<T extends AbstractStub<T>, K extends AbstractBlockingStub<K>> {
21+
public class ChannelConnector {
2522

2623
/**
2724
* The GRPC managed channel for managing the underlying GRPC connection.
@@ -48,22 +45,11 @@ public class ChannelConnector<T extends AbstractStub<T>, K extends AbstractBlock
4845
*/
4946
public ChannelConnector(
5047
final FlagdOptions options, final Consumer<FlagdProviderEvent> onConnectionEvent, ManagedChannel channel) {
51-
5248
this.channel = channel;
5349
this.deadline = options.getDeadline();
5450
this.onConnectionEvent = onConnectionEvent;
5551
}
5652

57-
/**
58-
* Constructs a {@code ChannelConnector} instance for testing purposes.
59-
*
60-
* @param options the configuration options for the GRPC connection
61-
* @param onConnectionEvent a consumer to handle connection events
62-
*/
63-
public ChannelConnector(final FlagdOptions options, final Consumer<FlagdProviderEvent> onConnectionEvent) {
64-
this(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
65-
}
66-
6753
/**
6854
* Initializes the GRPC connection by waiting for the channel to be ready and monitoring its state.
6955
*

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.protobuf.Struct;
44
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
5+
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
56
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
67
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
78
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
@@ -17,7 +18,6 @@
1718
import dev.openfeature.sdk.Awaitable;
1819
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
1920
import io.grpc.Status;
20-
import io.grpc.StatusException;
2121
import io.grpc.StatusRuntimeException;
2222
import io.grpc.stub.StreamObserver;
2323
import java.util.concurrent.BlockingQueue;
@@ -43,10 +43,10 @@ public class SyncStreamQueueSource implements QueueSource {
4343
private final String selector;
4444
private final String providerId;
4545
private final boolean syncMetadataDisabled;
46-
private final ChannelConnector<FlagSyncServiceStub, FlagSyncServiceBlockingStub> channelConnector;
46+
private final ChannelConnector channelConnector;
4747
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
48-
private final FlagSyncServiceStub stub;
49-
private final FlagSyncServiceBlockingStub blockingStub;
48+
private final FlagSyncServiceStub flagSyncStub;
49+
private final FlagSyncServiceBlockingStub metadataStub;
5050

5151
/**
5252
* Creates a new SyncStreamQueueSource responsible for observing the event stream.
@@ -57,26 +57,25 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
5757
selector = options.getSelector();
5858
providerId = options.getProviderId();
5959
syncMetadataDisabled = options.isSyncMetadataDisabled();
60-
channelConnector = new ChannelConnector<>(options, onConnectionEvent);
61-
this.stub = FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
62-
this.blockingStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
63-
.withWaitForReady();
60+
channelConnector = new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
61+
flagSyncStub = FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
62+
metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel()).withWaitForReady();
6463
}
6564

6665
// internal use only
6766
protected SyncStreamQueueSource(
6867
final FlagdOptions options,
69-
ChannelConnector<FlagSyncServiceStub, FlagSyncServiceBlockingStub> connectorMock,
68+
ChannelConnector connectorMock,
7069
FlagSyncServiceStub stubMock,
7170
FlagSyncServiceBlockingStub blockingStubMock) {
7271
streamDeadline = options.getStreamDeadlineMs();
7372
deadline = options.getDeadline();
7473
selector = options.getSelector();
7574
providerId = options.getProviderId();
7675
channelConnector = connectorMock;
77-
stub = stubMock;
76+
flagSyncStub = stubMock;
7877
syncMetadataDisabled = options.isSyncMetadataDisabled();
79-
blockingStub = blockingStubMock;
78+
metadataStub = blockingStubMock;
8079
}
8180

8281
/** Initialize sync stream connector. */
@@ -141,7 +140,7 @@ private Struct getMetadata() {
141140
return null;
142141
}
143142

144-
FlagSyncServiceBlockingStub localStub = blockingStub;
143+
FlagSyncServiceBlockingStub localStub = metadataStub;
145144

146145
if (deadline > 0) {
147146
localStub = localStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
@@ -167,7 +166,7 @@ private Struct getMetadata() {
167166
}
168167

169168
private void syncFlags(SyncStreamObserver streamObserver) {
170-
FlagSyncServiceStub localStub = stub; // don't mutate the stub
169+
FlagSyncServiceStub localStub = flagSyncStub; // don't mutate the stub
171170
if (streamDeadline > 0) {
172171
localStub = localStub.withDeadlineAfter(streamDeadline, TimeUnit.MILLISECONDS);
173172
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import dev.openfeature.contrib.providers.flagd.Config;
1010
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
1111
import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
12+
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
1213
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
1314
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
1415
import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver;
@@ -59,7 +60,7 @@
5960
public final class RpcResolver implements Resolver {
6061
private static final int QUEUE_SIZE = 5;
6162
private final AtomicBoolean shutdown = new AtomicBoolean(false);
62-
private final ChannelConnector<ServiceStub, ServiceBlockingStub> connector;
63+
private final ChannelConnector connector;
6364
private final Cache cache;
6465
private final ResolveStrategy strategy;
6566
private final FlagdOptions options;
@@ -83,7 +84,7 @@ public RpcResolver(
8384
this.strategy = ResolveFactory.getStrategy(options);
8485
this.options = options;
8586
incomingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
86-
this.connector = new ChannelConnector<>(options, onProviderEvent);
87+
this.connector = new ChannelConnector(options, onProviderEvent, ChannelBuilder.nettyChannel(options));
8788
this.onProviderEvent = onProviderEvent;
8889
this.stub = ServiceGrpc.newStub(this.connector.getChannel()).withWaitForReady();
8990
this.blockingStub =
@@ -97,7 +98,7 @@ protected RpcResolver(
9798
final Consumer<FlagdProviderEvent> onProviderEvent,
9899
ServiceStub mockStub,
99100
ServiceBlockingStub mockBlockingStub,
100-
ChannelConnector<ServiceStub, ServiceBlockingStub> connector) {
101+
ChannelConnector connector) {
101102
this.cache = cache;
102103
this.strategy = ResolveFactory.getStrategy(options);
103104
this.options = options;

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,7 @@ void whenShuttingDownGrpcConnectorConsumerReceivesDisconnectedEvent() throws Exc
9494
sync.countDown();
9595
};
9696

97-
ChannelConnector<ServiceGrpc.ServiceStub, ServiceGrpc.ServiceBlockingStub> instance =
98-
new ChannelConnector<>(FlagdOptions.builder().build(), testConsumer, testChannel);
97+
ChannelConnector instance = new ChannelConnector(FlagdOptions.builder().build(), testConsumer, testChannel);
9998

10099
instance.initialize();
101100
// when shutting grpc connector
@@ -122,8 +121,7 @@ void testMonitorChannelState(ConnectivityState state) throws Exception {
122121

123122
Consumer<FlagdProviderEvent> testConsumer = spy(Consumer.class);
124123

125-
ChannelConnector<ServiceGrpc.ServiceStub, ServiceGrpc.ServiceBlockingStub> instance =
126-
new ChannelConnector<>(FlagdOptions.builder().build(), testConsumer, channel);
124+
ChannelConnector instance = new ChannelConnector(FlagdOptions.builder().build(), testConsumer, channel);
127125

128126
instance.initialize();
129127

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.mockito.stubbing.Answer;
3333

3434
class SyncStreamQueueSourceTest {
35-
private ChannelConnector<FlagSyncServiceStub, FlagSyncServiceBlockingStub> mockConnector;
35+
private ChannelConnector mockConnector;
3636
private FlagSyncServiceBlockingStub blockingStub;
3737
private FlagSyncServiceStub stub;
3838
private StreamObserver<SyncFlagsResponse> observer;

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.mockito.stubbing.Answer;
2828

2929
class RpcResolverTest {
30-
private ChannelConnector<ServiceStub, ServiceBlockingStub> mockConnector;
30+
private ChannelConnector mockConnector;
3131
private ServiceBlockingStub blockingStub;
3232
private ServiceStub stub;
3333
private QueueingStreamObserver<EventStreamResponse> observer;

0 commit comments

Comments
 (0)