Skip to content

Commit 340cc23

Browse files
committed
feat(flagd): add new context enrichment appraoch for in-process provider
Signed-off-by: Simon Schrottner <[email protected]> diff --git c/providers/flagd/pkg/provider.go i/providers/flagd/pkg/provider.go index 2b0084c..cf52f01 100644 --- c/providers/flagd/pkg/provider.go +++ i/providers/flagd/pkg/provider.go @@ -22,8 +22,8 @@ type Provider struct { service IService status of.State mtx parallel.RWMutex - - eventStream chan of.Event + hooks []of.Hook + eventStream chan of.Event } func NewProvider(opts ...ProviderOption) (*Provider, error) { @@ -38,6 +38,7 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) { eventStream: make(chan of.Event), providerConfiguration: providerConfiguration, status: of.NotReadyState, + hooks: []of.Hook{}, } cacheService := cache.NewCacheService( @@ -60,7 +61,7 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) { provider.providerConfiguration.log, provider.providerConfiguration.EventStreamConnectionMaxAttempts) } else if provider.providerConfiguration.Resolver == inProcess { - service = process.NewInProcessService(process.Configuration{ + inprocess_service := process.NewInProcessService(process.Configuration{ Host: provider.providerConfiguration.Host, Port: provider.providerConfiguration.Port, ProviderID: provider.providerConfiguration.ProviderId, @@ -73,6 +74,13 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) { CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri, GrpcDialOptionsOverride: provider.providerConfiguration.GrpcDialOptionsOverride, }) + provider.hooks = append(provider.hooks, NewSyncContextHook( + func() *of.EvaluationContext { + evaluationContext := of.NewTargetlessEvaluationContext(inprocess_service.ContextValues) + return &evaluationContext + })) + service = inprocess_service + } else { service = process.NewInProcessService(process.Configuration{ OfflineFlagSource: provider.providerConfiguration.OfflineFlagSourcePath, @@ -145,7 +153,7 @@ func (p *Provider) EventChannel() <-chan of.Event { // Hooks flagd provider does not have any hooks, returns empty slice func (p *Provider) Hooks() []of.Hook { - return []of.Hook{} + return p.hooks } // Metadata returns value of Metadata (name of current service, exposed to openfeature sdk) diff --git c/providers/flagd/pkg/service/in_process/service.go i/providers/flagd/pkg/service/in_process/service.go index 468b1c6..4f1c2ee 100644 --- c/providers/flagd/pkg/service/in_process/service.go +++ i/providers/flagd/pkg/service/in_process/service.go @@ -3,7 +3,6 @@ package process import ( "context" "fmt" - "regexp" parallel "sync" @@ -33,6 +32,7 @@ type InProcess struct { sync sync.ISync syncEnd context.CancelFunc wg parallel.WaitGroup + ContextValues map[string]any } type Configuration struct { @@ -113,6 +113,10 @@ func (i *InProcess) Init() error { case data := <-syncChan: // re-syncs are ignored as we only support single flag sync source changes, _, err := i.evaluator.SetState(data) + if data.SyncContext != nil { + i.ContextValues = data.SyncContext.AsMap() + } + if err != nil { i.events <- of.Event{ ProviderName: "flagd", EventType: of.ProviderError, diff --git c/providers/flagd/pkg/service/in_process/service_grpc_test.go i/providers/flagd/pkg/service/in_process/service_grpc_test.go index a1e7c1c..c271161 100644 --- c/providers/flagd/pkg/service/in_process/service_grpc_test.go +++ i/providers/flagd/pkg/service/in_process/service_grpc_test.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/open-feature/go-sdk/openfeature" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/structpb" "log" "net" "testing" @@ -38,11 +39,20 @@ func TestInProcessProviderEvaluation(t *testing.T) { t.Fatal(err) } + m := make(map[string]any) + + m["context"] = "set" + syncContext, err := structpb.NewStruct(m) + if err != nil { + t.Fatal(err) + } + bufServ := &bufferedServer{ listener: listen, mockResponses: []*v1.SyncFlagsResponse{ { FlagConfiguration: flagRsp, + SyncContext: syncContext, }, }, fetchAllFlagsResponse: nil, @@ -112,6 +122,10 @@ func TestInProcessProviderEvaluation(t *testing.T) { if scope != detail.FlagMetadata["scope"] { t.Fatalf("Wrong scope value. Expected %s, but got %s", scope, detail.FlagMetadata["scope"]) } + + if len(inProcessService.ContextValues) == 0 { + t.Fatal("Expected context_values to be present, but got none") + } } // custom name resolver @@ -138,7 +152,7 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) { } inProcessService := NewInProcessService(Configuration{ - TargetUri: "envoy://localhost:9211/foo.service", + TargetUri: "envoy://localhost:9211/foo.service", Selector: scope, TLSEnabled: false, }) @@ -201,7 +215,6 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) { } } - // bufferedServer - a mock grpc service backed by buffered connection type bufferedServer struct { listener net.Listener diff --git c/providers/flagd/pkg/sync_context_hook.go i/providers/flagd/pkg/sync_context_hook.go new file mode 100644 index 0000000..ebb6590 --- /dev/null +++ i/providers/flagd/pkg/sync_context_hook.go @@ -0,0 +1,21 @@ +package flagd + +import ( + "context" + "github.com/open-feature/go-sdk/openfeature" +) + +type Supplier func() *openfeature.EvaluationContext + +type SyncContextHook struct { + openfeature.UnimplementedHook + contextSupplier Supplier +} + +func NewSyncContextHook(contextSupplier Supplier) SyncContextHook { + return SyncContextHook{contextSupplier: contextSupplier} +} + +func (hook SyncContextHook) Before(ctx context.Context, hookContext openfeature.HookContext, hookHints openfeature.HookHints) (*openfeature.EvaluationContext, error) { + return hook.contextSupplier(), nil +}
1 parent 0d3648c commit 340cc23

File tree

4 files changed

+53
-7
lines changed

4 files changed

+53
-7
lines changed

providers/flagd/pkg/provider.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ type Provider struct {
2222
service IService
2323
status of.State
2424
mtx parallel.RWMutex
25-
26-
eventStream chan of.Event
25+
hooks []of.Hook
26+
eventStream chan of.Event
2727
}
2828

2929
func NewProvider(opts ...ProviderOption) (*Provider, error) {
@@ -38,6 +38,7 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) {
3838
eventStream: make(chan of.Event),
3939
providerConfiguration: providerConfiguration,
4040
status: of.NotReadyState,
41+
hooks: []of.Hook{},
4142
}
4243

4344
cacheService := cache.NewCacheService(
@@ -60,7 +61,7 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) {
6061
provider.providerConfiguration.log,
6162
provider.providerConfiguration.EventStreamConnectionMaxAttempts)
6263
} else if provider.providerConfiguration.Resolver == inProcess {
63-
service = process.NewInProcessService(process.Configuration{
64+
inprocess_service := process.NewInProcessService(process.Configuration{
6465
Host: provider.providerConfiguration.Host,
6566
Port: provider.providerConfiguration.Port,
6667
ProviderID: provider.providerConfiguration.ProviderId,
@@ -73,6 +74,13 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) {
7374
CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri,
7475
GrpcDialOptionsOverride: provider.providerConfiguration.GrpcDialOptionsOverride,
7576
})
77+
provider.hooks = append(provider.hooks, NewSyncContextHook(
78+
func() *of.EvaluationContext {
79+
evaluationContext := of.NewTargetlessEvaluationContext(inprocess_service.ContextValues)
80+
return &evaluationContext
81+
}))
82+
service = inprocess_service
83+
7684
} else {
7785
service = process.NewInProcessService(process.Configuration{
7886
OfflineFlagSource: provider.providerConfiguration.OfflineFlagSourcePath,
@@ -145,7 +153,7 @@ func (p *Provider) EventChannel() <-chan of.Event {
145153

146154
// Hooks flagd provider does not have any hooks, returns empty slice
147155
func (p *Provider) Hooks() []of.Hook {
148-
return []of.Hook{}
156+
return p.hooks
149157
}
150158

151159
// Metadata returns value of Metadata (name of current service, exposed to openfeature sdk)

providers/flagd/pkg/service/in_process/service.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package process
33
import (
44
"context"
55
"fmt"
6-
76
"regexp"
87
parallel "sync"
98

@@ -33,6 +32,7 @@ type InProcess struct {
3332
sync sync.ISync
3433
syncEnd context.CancelFunc
3534
wg parallel.WaitGroup
35+
ContextValues map[string]any
3636
}
3737

3838
type Configuration struct {
@@ -113,6 +113,10 @@ func (i *InProcess) Init() error {
113113
case data := <-syncChan:
114114
// re-syncs are ignored as we only support single flag sync source
115115
changes, _, err := i.evaluator.SetState(data)
116+
if data.SyncContext != nil {
117+
i.ContextValues = data.SyncContext.AsMap()
118+
}
119+
116120
if err != nil {
117121
i.events <- of.Event{
118122
ProviderName: "flagd", EventType: of.ProviderError,

providers/flagd/pkg/service/in_process/service_grpc_test.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"github.com/open-feature/go-sdk/openfeature"
99
"google.golang.org/grpc"
10+
"google.golang.org/protobuf/types/known/structpb"
1011
"log"
1112
"net"
1213
"testing"
@@ -38,11 +39,20 @@ func TestInProcessProviderEvaluation(t *testing.T) {
3839
t.Fatal(err)
3940
}
4041

42+
m := make(map[string]any)
43+
44+
m["context"] = "set"
45+
syncContext, err := structpb.NewStruct(m)
46+
if err != nil {
47+
t.Fatal(err)
48+
}
49+
4150
bufServ := &bufferedServer{
4251
listener: listen,
4352
mockResponses: []*v1.SyncFlagsResponse{
4453
{
4554
FlagConfiguration: flagRsp,
55+
SyncContext: syncContext,
4656
},
4757
},
4858
fetchAllFlagsResponse: nil,
@@ -112,6 +122,10 @@ func TestInProcessProviderEvaluation(t *testing.T) {
112122
if scope != detail.FlagMetadata["scope"] {
113123
t.Fatalf("Wrong scope value. Expected %s, but got %s", scope, detail.FlagMetadata["scope"])
114124
}
125+
126+
if len(inProcessService.ContextValues) == 0 {
127+
t.Fatal("Expected context_values to be present, but got none")
128+
}
115129
}
116130

117131
// custom name resolver
@@ -138,7 +152,7 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) {
138152
}
139153

140154
inProcessService := NewInProcessService(Configuration{
141-
TargetUri: "envoy://localhost:9211/foo.service",
155+
TargetUri: "envoy://localhost:9211/foo.service",
142156
Selector: scope,
143157
TLSEnabled: false,
144158
})
@@ -201,7 +215,6 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) {
201215
}
202216
}
203217

204-
205218
// bufferedServer - a mock grpc service backed by buffered connection
206219
type bufferedServer struct {
207220
listener net.Listener
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package flagd
2+
3+
import (
4+
"context"
5+
"github.com/open-feature/go-sdk/openfeature"
6+
)
7+
8+
type Supplier func() *openfeature.EvaluationContext
9+
10+
type SyncContextHook struct {
11+
openfeature.UnimplementedHook
12+
contextSupplier Supplier
13+
}
14+
15+
func NewSyncContextHook(contextSupplier Supplier) SyncContextHook {
16+
return SyncContextHook{contextSupplier: contextSupplier}
17+
}
18+
19+
func (hook SyncContextHook) Before(ctx context.Context, hookContext openfeature.HookContext, hookHints openfeature.HookHints) (*openfeature.EvaluationContext, error) {
20+
return hook.contextSupplier(), nil
21+
}

0 commit comments

Comments
 (0)