Skip to content

Commit a891ba8

Browse files
Copilotaepfligemini-code-assist[bot]toddbaert
authored
feat(flagd): Add flagd-selector gRPC metadata header to in-process service. (#790)
Signed-off-by: Simon Schrottner <simon.schrottner@dynatrace.com> Signed-off-by: Todd Baert <todd.baert@dynatrace.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: aepfli <9987394+aepfli@users.noreply.github.com> Co-authored-by: Simon Schrottner <simon.schrottner@dynatrace.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Todd Baert <todd.baert@dynatrace.com>
1 parent 0999105 commit a891ba8

File tree

8 files changed

+387
-23
lines changed

8 files changed

+387
-23
lines changed

providers/flagd/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,25 @@ The flagd provider currently support following flag evaluation metadata,
189189
| `scope` | string | "selector" set for the associated source in flagd |
190190
| `providerID` | string | "providerID" set for the associated source in flagd |
191191

192+
## Selector Handling
193+
194+
When using the in-process resolver with a gRPC sync source, the provider supports filtering flag configurations using a selector. The selector can be configured using the `WithSelector` option or the `FLAGD_SOURCE_SELECTOR` environment variable.
195+
196+
### Header-based Selector (Recommended)
197+
198+
The provider now sends the selector as a `flagd-selector` gRPC metadata header when communicating with flagd sync services. This approach is consistent with how selectors are handled across all flagd services (sync, evaluation, and OFREP).
199+
200+
```go
201+
provider, err := flagd.NewProvider(
202+
flagd.WithInProcessResolver(),
203+
flagd.WithSelector("source=database,app=myapp"),
204+
)
205+
```
206+
207+
### Backward Compatibility
208+
209+
For backward compatibility with older flagd versions, the provider continues to include the selector in the gRPC request fields alongside the header. This dual approach ensures compatibility during the migration period until all flagd instances are updated.
210+
192211
## Logging
193212

194213
If not configured, logging falls back to the standard Go log package at error level only.

providers/flagd/pkg/configuration.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ type ResolverType string
1818

1919
// Naming and defaults must comply with flagd environment variables
2020
const (
21+
// DefaultRetryBackoffMs is the default initial backoff duration for stream retry
22+
DefaultRetryBackoffMs = 1000
23+
// DefaultRetryBackoffMaxMs is the default maximum backoff duration for stream retry
24+
DefaultRetryBackoffMaxMs = 120000
2125
defaultMaxCacheSize int = 1000
2226
defaultRpcPort uint16 = 8013
2327
defaultInProcessPort uint16 = 8015
@@ -27,8 +31,6 @@ const (
2731
defaultHost = "localhost"
2832
defaultResolver = rpc
2933
defaultGracePeriod = 5
30-
defaultRetryBackoffMs = 1000
31-
defaultRetryBackoffMaxMs = 120000
3234
defaultFatalStatusCodes = ""
3335

3436
rpc ResolverType = "rpc"
@@ -90,8 +92,8 @@ func newDefaultConfiguration(log logr.Logger) *ProviderConfiguration {
9092
Resolver: defaultResolver,
9193
Tls: defaultTLS,
9294
RetryGracePeriod: defaultGracePeriod,
93-
RetryBackoffMs: defaultRetryBackoffMs,
94-
RetryBackoffMaxMs: defaultRetryBackoffMaxMs,
95+
RetryBackoffMs: DefaultRetryBackoffMs,
96+
RetryBackoffMaxMs: DefaultRetryBackoffMaxMs,
9597
}
9698

9799
p.updateFromEnvVar()
@@ -222,8 +224,8 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
222224
}
223225

224226
cfg.RetryGracePeriod = getIntFromEnvVarOrDefault(flagdGracePeriodVariableName, defaultGracePeriod, cfg.log)
225-
cfg.RetryBackoffMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMsVariableName, defaultRetryBackoffMs, cfg.log)
226-
cfg.RetryBackoffMaxMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMaxMsVariableName, defaultRetryBackoffMaxMs, cfg.log)
227+
cfg.RetryBackoffMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMsVariableName, DefaultRetryBackoffMs, cfg.log)
228+
cfg.RetryBackoffMaxMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMaxMsVariableName, DefaultRetryBackoffMaxMs, cfg.log)
227229

228230
var fatalStatusCodes string
229231
if envVal := os.Getenv(flagdFatalStatusCodesVariableName); envVal != "" {

providers/flagd/pkg/service/in_process/grpc_config.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@ package process
33
import (
44
"encoding/json"
55
"fmt"
6-
"google.golang.org/grpc/codes"
76
"time"
7+
8+
"google.golang.org/grpc/codes"
89
)
910

1011
const (
11-
// Default timeouts and retry intervals
12+
// Default timeouts for keepalive settings
1213
defaultKeepaliveTime = 30 * time.Second
1314
defaultKeepaliveTimeout = 5 * time.Second
15+
// Default retry intervals per https://flagd.dev/reference/specifications/providers/#configuration
16+
DefaultRetryBackoffMs = 1000 // 1 second
17+
DefaultRetryBackoffMaxMs = 120000 // 120 seconds
1418
)
1519

1620
type RetryPolicy struct {
@@ -22,6 +26,25 @@ type RetryPolicy struct {
2226
}
2327

2428
func (g *Sync) buildRetryPolicy() string {
29+
// Use default values if not configured (per https://flagd.dev/reference/specifications/providers/#configuration)
30+
initialBackoffMs := g.RetryBackOffMs
31+
if initialBackoffMs <= 0 {
32+
initialBackoffMs = DefaultRetryBackoffMs
33+
}
34+
35+
maxBackoffMs := g.RetryBackOffMaxMs
36+
if maxBackoffMs <= 0 {
37+
maxBackoffMs = DefaultRetryBackoffMaxMs
38+
}
39+
40+
// Format durations for gRPC service config (requires seconds-only format like "1s", "0.1s", "120s")
41+
initialDur := time.Duration(initialBackoffMs) * time.Millisecond
42+
maxDur := time.Duration(maxBackoffMs) * time.Millisecond
43+
44+
// Convert to seconds and format as gRPC expects (no compound formats like "2m0s")
45+
initialBackoff := fmt.Sprint(initialDur.Seconds()) + "s"
46+
maxBackoff := fmt.Sprint(maxDur.Seconds()) + "s"
47+
2548
var policy = map[string]interface{}{
2649
"methodConfig": []map[string]interface{}{
2750
{
@@ -30,8 +53,8 @@ func (g *Sync) buildRetryPolicy() string {
3053
},
3154
"retryPolicy": RetryPolicy{
3255
MaxAttempts: 3,
33-
InitialBackoff: (time.Duration(g.RetryBackOffMs) * time.Millisecond).String(),
34-
MaxBackoff: (time.Duration(g.RetryBackOffMaxMs) * time.Millisecond).String(),
56+
InitialBackoff: initialBackoff,
57+
MaxBackoff: maxBackoff,
3558
BackoffMultiplier: 2.0,
3659
RetryableStatusCodes: []string{"UNKNOWN", "UNAVAILABLE"},
3760
},

providers/flagd/pkg/service/in_process/grpc_config_test.go

Lines changed: 79 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ package process
22

33
import (
44
"encoding/json"
5+
"strings"
6+
"testing"
7+
58
"github.com/open-feature/flagd/core/pkg/logger"
69
"go.uber.org/zap"
710
"google.golang.org/grpc/codes"
8-
"strings"
9-
"testing"
1011
)
1112

1213
func TestBuildRetryPolicy(t *testing.T) {
@@ -37,11 +38,11 @@ func TestBuildRetryPolicy(t *testing.T) {
3738
if retryPolicy["MaxAttempts"].(float64) != 3 {
3839
t.Errorf("MaxAttempts = %v; want 3", retryPolicy["MaxAttempts"])
3940
}
40-
if retryPolicy["InitialBackoff"].(string) != "100ms" {
41-
t.Errorf("InitialBackoff = %v; want 100ms", retryPolicy["InitialBackoff"])
41+
if retryPolicy["InitialBackoff"].(string) != "0.1s" {
42+
t.Errorf("InitialBackoff = %v; want 0.1s", retryPolicy["InitialBackoff"])
4243
}
43-
if retryPolicy["MaxBackoff"].(string) != "500ms" {
44-
t.Errorf("MaxBackoff = %v; want 500ms", retryPolicy["MaxBackoff"])
44+
if retryPolicy["MaxBackoff"].(string) != "0.5s" {
45+
t.Errorf("MaxBackoff = %v; want 0.5s", retryPolicy["MaxBackoff"])
4546
}
4647
if retryPolicy["BackoffMultiplier"].(float64) != 2.0 {
4748
t.Errorf("BackoffMultiplier = %v; want 2.0", retryPolicy["BackoffMultiplier"])
@@ -58,17 +59,87 @@ func TestBuildRetryPolicy(t *testing.T) {
5859
if !strings.Contains(result, `"MaxAttempts":3`) {
5960
t.Error("Result does not contain MaxAttempts")
6061
}
61-
if !strings.Contains(result, `"InitialBackoff":"100ms"`) {
62+
if !strings.Contains(result, `"InitialBackoff":"0.1s"`) {
6263
t.Error("Result does not contain InitialBackoff")
6364
}
64-
if !strings.Contains(result, `"MaxBackoff":"500ms"`) {
65+
if !strings.Contains(result, `"MaxBackoff":"0.5s"`) {
6566
t.Error("Result does not contain MaxBackoff")
6667
}
6768
if !strings.Contains(result, `"RetryableStatusCodes":["UNKNOWN","UNAVAILABLE"]`) {
6869
t.Error("Result does not contain RetryableStatusCodes")
6970
}
7071
}
7172

73+
// TestBuildRetryPolicyDefaults verifies that default values are applied per spec
74+
func TestBuildRetryPolicyDefaults(t *testing.T) {
75+
g := &Sync{
76+
RetryBackOffMs: 0, // Should use default 1000ms
77+
RetryBackOffMaxMs: 0, // Should use default 120000ms
78+
}
79+
80+
result := g.buildRetryPolicy()
81+
82+
// Unmarshal to check structure
83+
var policy map[string]interface{}
84+
if err := json.Unmarshal([]byte(result), &policy); err != nil {
85+
t.Fatalf("Failed to unmarshal result: %v", err)
86+
}
87+
88+
methodConfig, ok := policy["methodConfig"].([]interface{})
89+
if !ok || len(methodConfig) == 0 {
90+
t.Fatalf("methodConfig missing or empty")
91+
}
92+
93+
config := methodConfig[0].(map[string]interface{})
94+
retryPolicy, ok := config["retryPolicy"].(map[string]interface{})
95+
if !ok {
96+
t.Fatalf("retryPolicy missing")
97+
}
98+
99+
// time.Duration.String() formats milliseconds as "1s", "120s", etc.
100+
if retryPolicy["InitialBackoff"].(string) != "1s" {
101+
t.Errorf("InitialBackoff = %v; want 1s (default)", retryPolicy["InitialBackoff"])
102+
}
103+
if retryPolicy["MaxBackoff"].(string) != "120s" {
104+
t.Errorf("MaxBackoff = %v; want 120s (gRPC format for 120000ms)", retryPolicy["MaxBackoff"])
105+
}
106+
}
107+
108+
// TestBuildRetryPolicyFractionalSeconds verifies fractional second durations are formatted correctly
109+
func TestBuildRetryPolicyFractionalSeconds(t *testing.T) {
110+
g := &Sync{
111+
RetryBackOffMs: 1500, // 1.5 seconds
112+
RetryBackOffMaxMs: 2500, // 2.5 seconds
113+
}
114+
115+
result := g.buildRetryPolicy()
116+
117+
// Unmarshal to check structure
118+
var policy map[string]interface{}
119+
if err := json.Unmarshal([]byte(result), &policy); err != nil {
120+
t.Fatalf("Failed to unmarshal result: %v", err)
121+
}
122+
123+
methodConfig, ok := policy["methodConfig"].([]interface{})
124+
if !ok || len(methodConfig) == 0 {
125+
t.Fatalf("methodConfig missing or empty")
126+
}
127+
128+
config := methodConfig[0].(map[string]interface{})
129+
retryPolicy, ok := config["retryPolicy"].(map[string]interface{})
130+
if !ok {
131+
t.Fatalf("retryPolicy missing")
132+
}
133+
134+
// Verify fractional seconds are preserved in gRPC format
135+
if retryPolicy["InitialBackoff"].(string) != "1.5s" {
136+
t.Errorf("InitialBackoff = %v; want 1.5s", retryPolicy["InitialBackoff"])
137+
}
138+
if retryPolicy["MaxBackoff"].(string) != "2.5s" {
139+
t.Errorf("MaxBackoff = %v; want 2.5s", retryPolicy["MaxBackoff"])
140+
}
141+
}
142+
72143
func TestSync_initNonRetryableStatusCodesSet(t *testing.T) {
73144
tests := []struct {
74145
name string
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package process
2+
3+
import (
4+
"context"
5+
6+
"google.golang.org/grpc"
7+
"google.golang.org/grpc/metadata"
8+
)
9+
10+
const SelectorMetadataKey = "flagd-selector"
11+
12+
// selectorUnaryInterceptor adds the flagd-selector metadata header to unary gRPC calls
13+
func selectorUnaryInterceptor(selector string) grpc.UnaryClientInterceptor {
14+
return func(
15+
ctx context.Context,
16+
method string,
17+
req, reply interface{},
18+
cc *grpc.ClientConn,
19+
invoker grpc.UnaryInvoker,
20+
opts ...grpc.CallOption,
21+
) error {
22+
if selector != "" {
23+
ctx = metadata.AppendToOutgoingContext(ctx, SelectorMetadataKey, selector)
24+
}
25+
return invoker(ctx, method, req, reply, cc, opts...)
26+
}
27+
}
28+
29+
// selectorStreamInterceptor adds the flagd-selector metadata header to streaming gRPC calls
30+
func selectorStreamInterceptor(selector string) grpc.StreamClientInterceptor {
31+
return func(
32+
ctx context.Context,
33+
desc *grpc.StreamDesc,
34+
cc *grpc.ClientConn,
35+
method string,
36+
streamer grpc.Streamer,
37+
opts ...grpc.CallOption,
38+
) (grpc.ClientStream, error) {
39+
if selector != "" {
40+
ctx = metadata.AppendToOutgoingContext(ctx, SelectorMetadataKey, selector)
41+
}
42+
return streamer(ctx, desc, cc, method, opts...)
43+
}
44+
}

providers/flagd/pkg/service/in_process/grpc_sync.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,29 @@ func (g *Sync) Init(ctx context.Context) error {
8181

8282
// createConnection creates and configures the gRPC connection
8383
func (g *Sync) createConnection() (*grpc.ClientConn, error) {
84+
85+
var grpcInterceptorDialOptions []grpc.DialOption
86+
if g.Selector != "" {
87+
grpcInterceptorDialOptions = append(grpcInterceptorDialOptions,
88+
grpc.WithChainUnaryInterceptor(selectorUnaryInterceptor(g.Selector)),
89+
grpc.WithChainStreamInterceptor(selectorStreamInterceptor(g.Selector)),
90+
)
91+
}
92+
8493
if len(g.GrpcDialOptionsOverride) > 0 {
8594
g.Logger.Debug("using provided gRPC DialOptions override")
86-
return grpc.NewClient(g.URI, g.GrpcDialOptionsOverride...)
95+
dialOptions := make([]grpc.DialOption, 0, len(g.GrpcDialOptionsOverride)+len(grpcInterceptorDialOptions))
96+
dialOptions = append(dialOptions, g.GrpcDialOptionsOverride...)
97+
dialOptions = append(dialOptions, grpcInterceptorDialOptions...)
98+
return grpc.NewClient(g.URI, dialOptions...)
8799
}
88100

89101
// Build standard dial options
90102
dialOptions, err := g.buildDialOptions()
91103
if err != nil {
92104
return nil, fmt.Errorf("failed to build dial options: %w", err)
93105
}
106+
dialOptions = append(dialOptions, grpcInterceptorDialOptions...)
94107

95108
return grpc.NewClient(g.URI, dialOptions...)
96109
}

providers/flagd/pkg/service/in_process/service_grpc_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
package process
22

33
import (
4-
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
5-
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
64
"context"
75
"fmt"
8-
"github.com/open-feature/go-sdk/openfeature"
9-
"google.golang.org/grpc"
106
"log"
117
"net"
128
"testing"
139
"time"
10+
11+
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
12+
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
13+
"github.com/open-feature/go-sdk/openfeature"
14+
"google.golang.org/grpc"
1415
)
1516

1617
// shared flag for tests

0 commit comments

Comments
 (0)