Skip to content

Commit 42834a1

Browse files
Copilotaepfli
authored andcommitted
Implement header-based selector for sync stream connection
Co-authored-by: aepfli <9987394+aepfli@users.noreply.github.com>
1 parent 6988d88 commit 42834a1

File tree

2 files changed

+72
-3
lines changed

2 files changed

+72
-3
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,14 @@
33
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
44
import dev.openfeature.contrib.providers.flagd.resolver.common.nameresolvers.EnvoyResolverProvider;
55
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
6+
import io.grpc.CallOptions;
7+
import io.grpc.Channel;
8+
import io.grpc.ClientCall;
9+
import io.grpc.ClientInterceptor;
10+
import io.grpc.ForwardingClientCall;
611
import io.grpc.ManagedChannel;
12+
import io.grpc.Metadata;
13+
import io.grpc.MethodDescriptor;
714
import io.grpc.NameResolverRegistry;
815
import io.grpc.Status.Code;
916
import io.grpc.netty.GrpcSslContexts;
@@ -94,14 +101,19 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
94101
if (!Epoll.isAvailable()) {
95102
throw new IllegalStateException("unix socket cannot be used", Epoll.unavailabilityCause());
96103
}
97-
return NettyChannelBuilder.forAddress(new DomainSocketAddress(options.getSocketPath()))
104+
var channelBuilder = NettyChannelBuilder.forAddress(new DomainSocketAddress(options.getSocketPath()))
98105
.keepAliveTime(keepAliveMs, TimeUnit.MILLISECONDS)
99106
.eventLoopGroup(new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory()))
100107
.channelType(EpollDomainSocketChannel.class)
101108
.usePlaintext()
102109
.defaultServiceConfig(buildRetryPolicy(options))
103-
.enableRetry()
104-
.build();
110+
.enableRetry();
111+
112+
// add header-based selector interceptor if selector is provided
113+
if (options.getSelector() != null) {
114+
channelBuilder.intercept(createSelectorInterceptor(options.getSelector()));
115+
}
116+
return channelBuilder.build();
105117
}
106118

107119
// build a TCP socket
@@ -160,6 +172,30 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
160172
}
161173
}
162174

175+
/**
176+
* Creates a ClientInterceptor that adds the flagd-selector header to gRPC requests.
177+
* This is the preferred approach for passing selectors as per flagd issue #1814.
178+
*
179+
* @param selector the selector value to pass in the header
180+
* @return a ClientInterceptor that adds the flagd-selector header
181+
*/
182+
private static ClientInterceptor createSelectorInterceptor(String selector) {
183+
return new ClientInterceptor() {
184+
@Override
185+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
186+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
187+
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
188+
next.newCall(method, callOptions)) {
189+
@Override
190+
public void start(Listener<RespT> responseListener, Metadata headers) {
191+
headers.put(Metadata.Key.of("flagd-selector", Metadata.ASCII_STRING_MARSHALLER), selector);
192+
super.start(responseListener, headers);
193+
}
194+
};
195+
}
196+
};
197+
}
198+
163199
private static boolean isValidTargetUri(String targetUri) {
164200
if (targetUri == null) {
165201
return false;

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@
1616
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
1717
import dev.openfeature.sdk.Awaitable;
1818
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19+
import io.grpc.CallOptions;
20+
import io.grpc.Channel;
21+
import io.grpc.ClientCall;
22+
import io.grpc.ClientInterceptor;
23+
import io.grpc.ForwardingClientCall;
24+
import io.grpc.Metadata;
25+
import io.grpc.MethodDescriptor;
1926
import io.grpc.Status;
2027
import io.grpc.StatusRuntimeException;
2128
import io.grpc.stub.StreamObserver;
@@ -253,6 +260,8 @@ private void syncFlags(SyncStreamObserver streamObserver) {
253260
}
254261

255262
final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder();
263+
// Selector is now passed via header using ClientInterceptor (see constructor)
264+
// Keeping this for backward compatibility with older flagd versions
256265
if (this.selector != null) {
257266
syncRequest.setSelector(this.selector);
258267
}
@@ -266,6 +275,30 @@ private void syncFlags(SyncStreamObserver streamObserver) {
266275
streamObserver.done.await();
267276
}
268277

278+
/**
279+
* Creates a ClientInterceptor that adds the flagd-selector header to gRPC requests.
280+
* This is the preferred approach for passing selectors as per flagd issue #1814.
281+
*
282+
* @param selector the selector value to pass in the header
283+
* @return a ClientInterceptor that adds the flagd-selector header
284+
*/
285+
private static ClientInterceptor createSelectorInterceptor(String selector) {
286+
return new ClientInterceptor() {
287+
@Override
288+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
289+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
290+
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
291+
next.newCall(method, callOptions)) {
292+
@Override
293+
public void start(Listener<RespT> responseListener, Metadata headers) {
294+
headers.put(Metadata.Key.of("flagd-selector", Metadata.ASCII_STRING_MARSHALLER), selector);
295+
super.start(responseListener, headers);
296+
}
297+
};
298+
}
299+
};
300+
}
301+
269302
private void enqueueError(String message) {
270303
enqueueError(outgoingQueue, message);
271304
}

0 commit comments

Comments
 (0)