Skip to content

Commit 630d470

Browse files
Copilotaepflitoddbaert
authored
feat(flagd): Implement header-based selector for in-process sync stream connection #1622 (#1623)
Signed-off-by: Simon Schrottner <[email protected]> Signed-off-by: Todd Baert <[email protected]> Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: aepfli <[email protected]> Co-authored-by: Simon Schrottner <[email protected]> Co-authored-by: Todd Baert <[email protected]>
1 parent ba277bf commit 630d470

File tree

5 files changed

+136
-34
lines changed

5 files changed

+136
-34
lines changed

providers/flagd/README.md

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,46 @@ FlagdProvider flagdProvider = new FlagdProvider(
4747

4848
In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flags.json).
4949

50+
#### Selector filtering (In-process mode only)
51+
52+
The `selector` option allows filtering flag configurations from flagd based on source identifiers when using the in-process resolver. This is useful when flagd is configured with multiple flag sources and you want to sync only a specific subset.
53+
54+
##### Usage
55+
56+
To use selector filtering, simply configure the `selector` option when creating the provider:
57+
58+
```java
59+
FlagdProvider flagdProvider = new FlagdProvider(
60+
FlagdOptions.builder()
61+
.resolverType(Config.Resolver.IN_PROCESS)
62+
.selector("source=my-app")
63+
.build());
64+
```
65+
66+
Or via environment variable:
67+
```bash
68+
export FLAGD_SOURCE_SELECTOR="source=my-app"
69+
```
70+
71+
##### Implementation details
72+
73+
> [!IMPORTANT]
74+
> **Selector normalization (flagd issue #1814)**
75+
>
76+
> As part of [flagd issue #1814](https://github.com/open-feature/flagd/issues/1814), the flagd project is normalizing selector handling across all services to use the `flagd-selector` gRPC metadata header.
77+
>
78+
> **Current implementation:**
79+
> - The Java SDK **automatically passes the selector via the `flagd-selector` header** (preferred approach)
80+
> - For backward compatibility with older flagd versions, the selector is **also sent in the request body**
81+
> - Both methods work with current flagd versions
82+
> - In a future major version of flagd, the request body selector field may be removed
83+
>
84+
> **No migration needed:**
85+
>
86+
> Users do not need to make any code changes. The SDK handles selector normalization automatically.
87+
88+
For more details on selector normalization, see the [flagd selector normalization issue](https://github.com/open-feature/flagd/issues/1814).
89+
5090
#### Sync-metadata
5191

5292
To support the injection of contextual data configured in flagd for in-process evaluation, the provider exposes a `getSyncMetadata` accessor which provides the most recent value returned by the [GetMetadata RPC](https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata).
@@ -106,30 +146,33 @@ variables.
106146

107147
Given below are the supported configurations:
108148

109-
| Option name | Environment variable name | Type & Values | Default | Compatible resolver |
110-
|-----------------------|------------------------------------------------------------------------|--------------------------|-------------------------------|-------------------------|
111-
| resolver | FLAGD_RESOLVER | String - rpc, in-process | rpc | |
112-
| host | FLAGD_HOST | String | localhost | rpc & in-process |
113-
| port | FLAGD_PORT (rpc), FLAGD_SYNC_PORT (in-process, FLAGD_PORT as fallback) | int | 8013 (rpc), 8015 (in-process) | rpc & in-process |
114-
| targetUri | FLAGD_TARGET_URI | string | null | rpc & in-process |
115-
| tls | FLAGD_TLS | boolean | false | rpc & in-process |
116-
| defaultAuthority | FLAGD_DEFAULT_AUTHORITY | String | null | rpc & in-process |
117-
| socketPath | FLAGD_SOCKET_PATH | String | null | rpc & in-process |
118-
| certPath | FLAGD_SERVER_CERT_PATH | String | null | rpc & in-process |
119-
| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process & file |
120-
| streamDeadlineMs | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process |
121-
| keepAliveTime | FLAGD_KEEP_ALIVE_TIME_MS | long | 0 | rpc & in-process |
122-
| selector | FLAGD_SOURCE_SELECTOR | String | null | in-process |
123-
| providerId | FLAGD_SOURCE_PROVIDER_ID | String | null | in-process |
124-
| cache | FLAGD_CACHE | String - lru, disabled | lru | rpc |
125-
| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | rpc |
126-
| maxEventStreamRetries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc |
127-
| retryBackoffMs | FLAGD_RETRY_BACKOFF_MS | int | 1000 | rpc |
128-
| offlineFlagSourcePath | FLAGD_OFFLINE_FLAG_SOURCE_PATH | String | null | file |
129-
| offlinePollIntervalMs | FLAGD_OFFLINE_POLL_MS | int | 5000 | file |
149+
| Option name | Environment variable name | Type & Values | Default | Compatible resolver |
150+
| --------------------- | ---------------------------------------------------------------------- | ------------------------ | ----------------------------- | ------------------------------------------------------------------------------- |
151+
| resolver | FLAGD_RESOLVER | String - rpc, in-process | rpc | |
152+
| host | FLAGD_HOST | String | localhost | rpc & in-process |
153+
| port | FLAGD_PORT (rpc), FLAGD_SYNC_PORT (in-process, FLAGD_PORT as fallback) | int | 8013 (rpc), 8015 (in-process) | rpc & in-process |
154+
| targetUri | FLAGD_TARGET_URI | string | null | rpc & in-process |
155+
| tls | FLAGD_TLS | boolean | false | rpc & in-process |
156+
| defaultAuthority | FLAGD_DEFAULT_AUTHORITY | String | null | rpc & in-process |
157+
| socketPath | FLAGD_SOCKET_PATH | String | null | rpc & in-process |
158+
| certPath | FLAGD_SERVER_CERT_PATH | String | null | rpc & in-process |
159+
| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process & file |
160+
| streamDeadlineMs | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process |
161+
| keepAliveTime | FLAGD_KEEP_ALIVE_TIME_MS | long | 0 | rpc & in-process |
162+
| selector | FLAGD_SOURCE_SELECTOR | String | null | in-process (see [migration guidance](#selector-filtering-in-process-mode-only)) |
163+
| providerId | FLAGD_SOURCE_PROVIDER_ID | String | null | in-process |
164+
| cache | FLAGD_CACHE | String - lru, disabled | lru | rpc |
165+
| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | rpc |
166+
| maxEventStreamRetries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc |
167+
| retryBackoffMs | FLAGD_RETRY_BACKOFF_MS | int | 1000 | rpc |
168+
| offlineFlagSourcePath | FLAGD_OFFLINE_FLAG_SOURCE_PATH | String | null | file |
169+
| offlinePollIntervalMs | FLAGD_OFFLINE_POLL_MS | int | 5000 | file |
130170

131171
> [!NOTE]
132172
> Some configurations are only applicable for RPC resolver.
173+
174+
> [!NOTE]
175+
> The `selector` option automatically uses the `flagd-selector` header (the preferred approach per [flagd issue #1814](https://github.com/open-feature/flagd/issues/1814)) while maintaining backward compatibility with older flagd versions. See [Selector filtering](#selector-filtering-in-process-mode-only) for details.
133176
>
134177
135178
### Unix socket support
@@ -189,6 +232,9 @@ FlagdProvider flagdProvider = new FlagdProvider(
189232

190233
The `clientInterceptors` and `defaultAuthority` are meant for connection of the in-process resolver to a Sync API implementation on a host/port, that might require special credentials or headers.
191234

235+
> [!NOTE]
236+
> The SDK automatically handles the `flagd-selector` header when the `selector` option is configured. Custom interceptors are not needed for selector filtering. See [Selector filtering](#selector-filtering-in-process-mode-only) for details.
237+
192238
```java
193239
private static ClientInterceptor createHeaderInterceptor() {
194240
return new ClientInterceptor() {

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ public class FlagdOptions {
124124
fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);
125125
/**
126126
* Selector to be used with flag sync gRPC contract.
127+
*
128+
* <p>The SDK automatically passes the selector via the {@code flagd-selector} gRPC metadata header
129+
* (the preferred approach per <a href="https://github.com/open-feature/flagd/issues/1814">flagd issue #1814</a>).
130+
* For backward compatibility with older flagd versions, the selector is also sent in the request body.
131+
*
132+
* <p>Only applicable for in-process resolver mode.
133+
*
134+
* @see <a href="https://github.com/open-feature/java-sdk-contrib/tree/main/providers/flagd#selector-filtering-in-process-mode-only">Selector filtering documentation</a>
127135
**/
128136
@Builder.Default
129137
private String selector = fallBackToEnvOrDefault(Config.SOURCE_SELECTOR_ENV_VAR_NAME, null);

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,14 @@
33
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
44
import dev.openfeature.contrib.providers.flagd.resolver.common.nameresolvers.EnvoyResolverProvider;
55
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
6+
import io.grpc.CallOptions;
7+
import io.grpc.Channel;
8+
import io.grpc.ClientCall;
9+
import io.grpc.ClientInterceptor;
10+
import io.grpc.ForwardingClientCall;
611
import io.grpc.ManagedChannel;
12+
import io.grpc.Metadata;
13+
import io.grpc.MethodDescriptor;
714
import io.grpc.NameResolverRegistry;
815
import io.grpc.Status.Code;
916
import io.grpc.netty.GrpcSslContexts;
@@ -26,6 +33,10 @@
2633
/** gRPC channel builder helper. */
2734
@SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "we don't care to serialize this")
2835
public class ChannelBuilder {
36+
37+
private static final Metadata.Key<String> FLAGD_SELECTOR_KEY =
38+
Metadata.Key.of("flagd-selector", Metadata.ASCII_STRING_MARSHALLER);
39+
2940
/**
3041
* Controls retry (not-reconnection) policy for failed RPCs.
3142
*/
@@ -94,14 +105,19 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
94105
if (!Epoll.isAvailable()) {
95106
throw new IllegalStateException("unix socket cannot be used", Epoll.unavailabilityCause());
96107
}
97-
return NettyChannelBuilder.forAddress(new DomainSocketAddress(options.getSocketPath()))
108+
var channelBuilder = NettyChannelBuilder.forAddress(new DomainSocketAddress(options.getSocketPath()))
98109
.keepAliveTime(keepAliveMs, TimeUnit.MILLISECONDS)
99110
.eventLoopGroup(new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory()))
100111
.channelType(EpollDomainSocketChannel.class)
101112
.usePlaintext()
102113
.defaultServiceConfig(buildRetryPolicy(options))
103-
.enableRetry()
104-
.build();
114+
.enableRetry();
115+
116+
// add header-based selector interceptor if selector is provided
117+
if (options.getSelector() != null) {
118+
channelBuilder.intercept(createSelectorInterceptor(options.getSelector()));
119+
}
120+
return channelBuilder.build();
105121
}
106122

107123
// build a TCP socket
@@ -116,14 +132,14 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
116132
final String defaultTarget = String.format("%s:%s", options.getHost(), options.getPort());
117133
final String targetUri = isValidTargetUri(options.getTargetUri()) ? options.getTargetUri() : defaultTarget;
118134

119-
final NettyChannelBuilder builder =
135+
final NettyChannelBuilder channelBuilder =
120136
NettyChannelBuilder.forTarget(targetUri).keepAliveTime(keepAliveMs, TimeUnit.MILLISECONDS);
121137

122138
if (options.getDefaultAuthority() != null) {
123-
builder.overrideAuthority(options.getDefaultAuthority());
139+
channelBuilder.overrideAuthority(options.getDefaultAuthority());
124140
}
125141
if (options.getClientInterceptors() != null) {
126-
builder.intercept(options.getClientInterceptors());
142+
channelBuilder.intercept(options.getClientInterceptors());
127143
}
128144
if (options.isTls()) {
129145
SslContextBuilder sslContext = GrpcSslContexts.forClient();
@@ -135,17 +151,22 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
135151
}
136152
}
137153

138-
builder.sslContext(sslContext.build());
154+
channelBuilder.sslContext(sslContext.build());
139155
} else {
140-
builder.usePlaintext();
156+
channelBuilder.usePlaintext();
141157
}
142158

143159
// telemetry interceptor if option is provided
144160
if (options.getOpenTelemetry() != null) {
145-
builder.intercept(new FlagdGrpcInterceptor(options.getOpenTelemetry()));
161+
channelBuilder.intercept(new FlagdGrpcInterceptor(options.getOpenTelemetry()));
162+
}
163+
// add header-based selector interceptor if selector is provided
164+
if (options.getSelector() != null) {
165+
channelBuilder.intercept(createSelectorInterceptor(options.getSelector()));
146166
}
147167

148-
return builder.defaultServiceConfig(buildRetryPolicy(options))
168+
return channelBuilder
169+
.defaultServiceConfig(buildRetryPolicy(options))
149170
.enableRetry()
150171
.build();
151172
} catch (SSLException ssle) {
@@ -160,6 +181,30 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
160181
}
161182
}
162183

184+
/**
185+
* Creates a ClientInterceptor that adds the flagd-selector header to gRPC requests.
186+
* This is the preferred approach for passing selectors as per flagd issue #1814.
187+
*
188+
* @param selector the selector value to pass in the header
189+
* @return a ClientInterceptor that adds the flagd-selector header
190+
*/
191+
private static ClientInterceptor createSelectorInterceptor(String selector) {
192+
return new ClientInterceptor() {
193+
@Override
194+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
195+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
196+
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
197+
next.newCall(method, callOptions)) {
198+
@Override
199+
public void start(Listener<RespT> responseListener, Metadata headers) {
200+
headers.put(FLAGD_SELECTOR_KEY, selector);
201+
super.start(responseListener, headers);
202+
}
203+
};
204+
}
205+
};
206+
}
207+
163208
private static boolean isValidTargetUri(String targetUri) {
164209
if (targetUri == null) {
165210
return false;

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
justification = "We need to expose the BlockingQueue to allow consumers to read from it")
3636
public class SyncStreamQueueSource implements QueueSource {
3737
private static final int QUEUE_SIZE = 5;
38-
3938
private final AtomicBoolean shutdown = new AtomicBoolean(false);
4039
private final AtomicBoolean shouldThrottle = new AtomicBoolean(false);
4140
private final int streamDeadline;
@@ -253,6 +252,8 @@ private void syncFlags(SyncStreamObserver streamObserver) {
253252
}
254253

255254
final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder();
255+
// Selector is now passed via header using ClientInterceptor (see constructor)
256+
// Keeping this for backward compatibility with older flagd versions
256257
if (this.selector != null) {
257258
syncRequest.setSelector(this.selector);
258259
}

0 commit comments

Comments
 (0)