Skip to content

Commit b70fa06

Browse files
feat: add server-side deadline to sync service (#1638)
<!-- Please use this template for your pull request. --> <!-- Please use the sections that you need and delete other sections --> ## This PR <!-- add the description of the PR here --> - adds server side deadline for sync and event streams configurable via cmd argument `--stream-deadline` ### Related Issues #1582 ### Notes <!-- any additional notes for this PR --> ### How to test 1. Run flagd with `--stream-deadline 3s` // 3s can be replaced with any duration the deadline should have 2. Test Event Stream deadline: run `grpcurl -v --proto schemas/protobuf/flagd/evaluation/v1/evaluation.proto -plaintext localhost:8013 flagd.evaluation.v1.Service/EventStream` or similar depending on your flagd settings to check if the deadline exceeded is returned after the specified duration 3. Test Sync Service Stream deadline: run `grpcurl -v --proto schemas/protobuf/flagd/sync/v1/sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags` or similar depending on your flagd settings to check if the deadline exceeded is returned after the specified duration Signed-off-by: alexandra.oberaigner <[email protected]> Signed-off-by: Todd Baert <[email protected]>
1 parent ba34815 commit b70fa06

File tree

10 files changed

+243
-89
lines changed

10 files changed

+243
-89
lines changed

core/pkg/service/iservice.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package service
22

33
import (
44
"context"
5+
"time"
56

67
"connectrpc.com/connect"
78
)
@@ -34,6 +35,7 @@ type Configuration struct {
3435
Options []connect.HandlerOption
3536
ContextValues map[string]any
3637
HeaderToContextKeyMappings map[string]string
38+
StreamDeadline time.Duration
3739
}
3840

3941
/*

docs/reference/flagd-cli/flagd_start.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ flagd start [flags]
2929
-k, --server-key-path string Server side tls key path
3030
-d, --socket-path string Flagd unix socket path. With grpc the evaluations service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally.
3131
-s, --sources string JSON representation of an array of SourceConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object: https://flagd.dev/reference/sync-configuration/#source-configuration
32+
--stream-deadline duration Set a server-side deadline for flagd sync and event streams (default 0, means no deadline).
3233
-g, --sync-port int32 gRPC Sync port (default 8015)
3334
-e, --sync-socket-path string Flagd sync service socket path. With grpc the sync service will be available on this address.
3435
-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath, URL (HTTP and gRPC), FeatureFlag custom resource, or GCS or Azure Blob. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.

flagd/cmd/start.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const (
3838
uriFlagName = "uri"
3939
contextValueFlagName = "context-value"
4040
headerToContextKeyFlagName = "context-from-header"
41+
streamDeadlineFlagName = "stream-deadline"
4142
)
4243

4344
func init() {
@@ -85,8 +86,9 @@ func init() {
8586
"from disk")
8687
flags.StringToStringP(contextValueFlagName, "X", map[string]string{}, "add arbitrary key value pairs "+
8788
"to the flag evaluation context")
88-
flags.StringToStringP(headerToContextKeyFlagName, "H", map[string]string{}, "add key-value pairs to map " +
89+
flags.StringToStringP(headerToContextKeyFlagName, "H", map[string]string{}, "add key-value pairs to map "+
8990
"header values to context values, where key is Header name, value is context key")
91+
flags.Duration(streamDeadlineFlagName, 0, "Set a server-side deadline for flagd sync and event streams (default 0, means no deadline).")
9092

9193
bindFlags(flags)
9294
}
@@ -111,6 +113,7 @@ func bindFlags(flags *pflag.FlagSet) {
111113
_ = viper.BindPFlag(ofrepPortFlagName, flags.Lookup(ofrepPortFlagName))
112114
_ = viper.BindPFlag(contextValueFlagName, flags.Lookup(contextValueFlagName))
113115
_ = viper.BindPFlag(headerToContextKeyFlagName, flags.Lookup(headerToContextKeyFlagName))
116+
_ = viper.BindPFlag(streamDeadlineFlagName, flags.Lookup(streamDeadlineFlagName))
114117
}
115118

116119
// startCmd represents the start command
@@ -182,6 +185,7 @@ var startCmd = &cobra.Command{
182185
ServiceSocketPath: viper.GetString(socketPathFlagName),
183186
SyncServicePort: viper.GetUint16(syncPortFlagName),
184187
SyncServiceSocketPath: viper.GetString(syncSocketPathFlagName),
188+
StreamDeadline: viper.GetDuration(streamDeadlineFlagName),
185189
SyncProviders: syncProviders,
186190
ContextValues: contextValuesToMap,
187191
HeaderToContextKeyMappings: headerToContextKeyMappings,

flagd/pkg/runtime/from_config.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type Config struct {
3838
ServiceSocketPath string
3939
SyncServicePort uint16
4040
SyncServiceSocketPath string
41+
StreamDeadline time.Duration
4142

4243
SyncProviders []sync.SourceConfig
4344
CORS []string
@@ -115,14 +116,15 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
115116

116117
// flag sync service
117118
flagSyncService, err := flagsync.NewSyncService(flagsync.SvcConfigurations{
118-
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
119-
Port: config.SyncServicePort,
120-
Sources: sources,
121-
Store: s,
122-
ContextValues: config.ContextValues,
123-
KeyPath: config.ServiceKeyPath,
124-
CertPath: config.ServiceCertPath,
125-
SocketPath: config.SyncServiceSocketPath,
119+
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
120+
Port: config.SyncServicePort,
121+
Sources: sources,
122+
Store: s,
123+
ContextValues: config.ContextValues,
124+
KeyPath: config.ServiceKeyPath,
125+
CertPath: config.ServiceCertPath,
126+
SocketPath: config.SyncServiceSocketPath,
127+
StreamDeadline: config.StreamDeadline,
126128
})
127129
if err != nil {
128130
return nil, fmt.Errorf("error creating sync service: %w", err)
@@ -158,6 +160,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
158160
Options: options,
159161
ContextValues: config.ContextValues,
160162
HeaderToContextKeyMappings: config.HeaderToContextKeyMappings,
163+
StreamDeadline: config.StreamDeadline,
161164
},
162165
SyncImpl: iSyncs,
163166
}, nil

flagd/pkg/service/flag-evaluation/connect_service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ func (s *ConnectService) setupServer(svcConf service.Configuration) (net.Listene
173173
s.metrics,
174174
svcConf.ContextValues,
175175
svcConf.HeaderToContextKeyMappings,
176+
svcConf.StreamDeadline,
176177
)
177178

178179
_, newHandler := evaluationV1.NewServiceHandler(newFes, append(svcConf.Options, marshalOpts)...)

flagd/pkg/service/flag-evaluation/flag_evaluator_v2.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package service
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"time"
78

@@ -27,6 +28,7 @@ type FlagEvaluationService struct {
2728
flagEvalTracer trace.Tracer
2829
contextValues map[string]any
2930
headerToContextKeyMappings map[string]string
31+
deadline time.Duration
3032
}
3133

3234
// NewFlagEvaluationService creates a FlagEvaluationService with provided parameters
@@ -36,6 +38,7 @@ func NewFlagEvaluationService(log *logger.Logger,
3638
metricsRecorder telemetry.IMetricsRecorder,
3739
contextValues map[string]any,
3840
headerToContextKeyMappings map[string]string,
41+
streamDeadline time.Duration,
3942
) *FlagEvaluationService {
4043
svc := &FlagEvaluationService{
4144
logger: log,
@@ -45,6 +48,7 @@ func NewFlagEvaluationService(log *logger.Logger,
4548
flagEvalTracer: otel.Tracer("flagd.evaluation.v1"),
4649
contextValues: contextValues,
4750
headerToContextKeyMappings: headerToContextKeyMappings,
51+
deadline: streamDeadline,
4852
}
4953

5054
if metricsRecorder != nil {
@@ -143,6 +147,15 @@ func (s *FlagEvaluationService) EventStream(
143147
req *connect.Request[evalV1.EventStreamRequest],
144148
stream *connect.ServerStream[evalV1.EventStreamResponse],
145149
) error {
150+
serviceCtx := ctx
151+
// attach server-side stream deadline to context
152+
if s.deadline != 0 {
153+
streamDeadline := time.Now().Add(s.deadline)
154+
deadlineCtx, cancel := context.WithDeadline(ctx, streamDeadline)
155+
serviceCtx = deadlineCtx
156+
defer cancel()
157+
}
158+
146159
requestNotificationChan := make(chan service.Notification, 1)
147160
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
148161
defer s.eventingConfiguration.Unsubscribe(req)
@@ -171,7 +184,11 @@ func (s *FlagEvaluationService) EventStream(
171184
if err != nil {
172185
s.logger.Error(err.Error())
173186
}
174-
case <-ctx.Done():
187+
case <-serviceCtx.Done():
188+
if errors.Is(serviceCtx.Err(), context.DeadlineExceeded) {
189+
s.logger.Debug(fmt.Sprintf("server-side deadline of %s exceeded, exiting stream request with grpc error code 4", s.deadline.String()))
190+
return connect.NewError(connect.CodeDeadlineExceeded, fmt.Errorf("%s", "stream closed due to server-side timeout"))
191+
}
175192
return nil
176193
}
177194
}

flagd/pkg/service/flag-evaluation/flag_evaluator_v2_test.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func TestConnectServiceV2_ResolveAll(t *testing.T) {
104104
).AnyTimes()
105105

106106
metrics, exp := getMetricReader()
107-
s := NewFlagEvaluationService(logger.NewLogger(nil, false), eval, &eventingConfiguration{}, metrics, nil, nil)
107+
s := NewFlagEvaluationService(logger.NewLogger(nil, false), eval, &eventingConfiguration{}, metrics, nil, nil, 0)
108108

109109
// when
110110
got, err := s.ResolveAll(context.Background(), connect.NewRequest(tt.req))
@@ -222,6 +222,7 @@ func TestFlag_EvaluationV2_ResolveBoolean(t *testing.T) {
222222
metrics,
223223
nil,
224224
nil,
225+
0,
225226
)
226227
got, err := s.ResolveBoolean(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
227228
if (err != nil) && !errors.Is(err, tt.wantErr) {
@@ -279,6 +280,7 @@ func BenchmarkFlag_EvaluationV2_ResolveBoolean(b *testing.B) {
279280
metrics,
280281
nil,
281282
nil,
283+
0,
282284
)
283285
b.Run(name, func(b *testing.B) {
284286
for i := 0; i < b.N; i++ {
@@ -379,6 +381,7 @@ func TestFlag_EvaluationV2_ResolveString(t *testing.T) {
379381
metrics,
380382
nil,
381383
nil,
384+
0,
382385
)
383386
got, err := s.ResolveString(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
384387
if (err != nil) && !errors.Is(err, tt.wantErr) {
@@ -436,6 +439,7 @@ func BenchmarkFlag_EvaluationV2_ResolveString(b *testing.B) {
436439
metrics,
437440
nil,
438441
nil,
442+
0,
439443
)
440444
b.Run(name, func(b *testing.B) {
441445
for i := 0; i < b.N; i++ {
@@ -535,6 +539,7 @@ func TestFlag_EvaluationV2_ResolveFloat(t *testing.T) {
535539
metrics,
536540
nil,
537541
nil,
542+
0,
538543
)
539544
got, err := s.ResolveFloat(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
540545
if (err != nil) && !errors.Is(err, tt.wantErr) {
@@ -592,6 +597,7 @@ func BenchmarkFlag_EvaluationV2_ResolveFloat(b *testing.B) {
592597
metrics,
593598
nil,
594599
nil,
600+
0,
595601
)
596602
b.Run(name, func(b *testing.B) {
597603
for i := 0; i < b.N; i++ {
@@ -691,6 +697,7 @@ func TestFlag_EvaluationV2_ResolveInt(t *testing.T) {
691697
metrics,
692698
nil,
693699
nil,
700+
0,
694701
)
695702
got, err := s.ResolveInt(tt.functionArgs.ctx, connect.NewRequest(tt.functionArgs.req))
696703
if (err != nil) && !errors.Is(err, tt.wantErr) {
@@ -748,6 +755,7 @@ func BenchmarkFlag_EvaluationV2_ResolveInt(b *testing.B) {
748755
metrics,
749756
nil,
750757
nil,
758+
0,
751759
)
752760
b.Run(name, func(b *testing.B) {
753761
for i := 0; i < b.N; i++ {
@@ -850,6 +858,7 @@ func TestFlag_EvaluationV2_ResolveObject(t *testing.T) {
850858
metrics,
851859
nil,
852860
nil,
861+
0,
853862
)
854863

855864
outParsed, err := structpb.NewStruct(tt.evalFields.result)
@@ -915,6 +924,7 @@ func BenchmarkFlag_EvaluationV2_ResolveObject(b *testing.B) {
915924
metrics,
916925
nil,
917926
nil,
927+
0,
918928
)
919929
if name != "eval returns error" {
920930
outParsed, err := structpb.NewStruct(tt.evalFields.result)
@@ -1004,9 +1014,9 @@ func Test_mergeContexts(t *testing.T) {
10041014
{
10051015
name: "merge contexts with no headers, with no header-context mappings",
10061016
args: args{
1007-
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
1008-
configContext: map[string]any{"k2": "v22", "k3": "v3"},
1009-
headers: http.Header{},
1017+
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
1018+
configContext: map[string]any{"k2": "v22", "k3": "v3"},
1019+
headers: http.Header{},
10101020
headerToContextKeyMappings: map[string]string{},
10111021
},
10121022
// static context should "win"
@@ -1015,9 +1025,9 @@ func Test_mergeContexts(t *testing.T) {
10151025
{
10161026
name: "merge contexts with headers, with no header-context mappings",
10171027
args: args{
1018-
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
1019-
configContext: map[string]any{"k2": "v22", "k3": "v3"},
1020-
headers: http.Header{"X-key": []string{"value"}, "X-token": []string{"token"}},
1028+
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
1029+
configContext: map[string]any{"k2": "v22", "k3": "v3"},
1030+
headers: http.Header{"X-key": []string{"value"}, "X-token": []string{"token"}},
10211031
headerToContextKeyMappings: map[string]string{},
10221032
},
10231033
// static context should "win"
@@ -1026,9 +1036,9 @@ func Test_mergeContexts(t *testing.T) {
10261036
{
10271037
name: "merge contexts with no headers, with header-context mappings",
10281038
args: args{
1029-
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
1030-
configContext: map[string]any{"k2": "v22", "k3": "v3"},
1031-
headers: http.Header{},
1039+
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
1040+
configContext: map[string]any{"k2": "v22", "k3": "v3"},
1041+
headers: http.Header{},
10321042
headerToContextKeyMappings: map[string]string{"X-key": "k2"},
10331043
},
10341044
// static context should "win"
@@ -1037,9 +1047,9 @@ func Test_mergeContexts(t *testing.T) {
10371047
{
10381048
name: "merge contexts with headers, with header-context mappings",
10391049
args: args{
1040-
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
1041-
configContext: map[string]any{"k2": "v22", "k3": "v3"},
1042-
headers: http.Header{"X-key": []string{"value"}, "X-token": []string{"token"}},
1050+
clientContext: map[string]any{"k1": "v1", "k2": "v2"},
1051+
configContext: map[string]any{"k2": "v22", "k3": "v3"},
1052+
headers: http.Header{"X-key": []string{"value"}, "X-token": []string{"token"}},
10431053
headerToContextKeyMappings: map[string]string{"X-key": "k2"},
10441054
},
10451055
// header context should "win"

flagd/pkg/service/flag-sync/handler.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ package sync
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"google.golang.org/grpc/codes"
8+
"google.golang.org/grpc/status"
9+
"time"
610

711
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
812
syncv1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
@@ -15,14 +19,22 @@ type syncHandler struct {
1519
mux *Multiplexer
1620
log *logger.Logger
1721
contextValues map[string]any
22+
deadline time.Duration
1823
}
1924

2025
func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.FlagSyncService_SyncFlagsServer) error {
2126
muxPayload := make(chan payload, 1)
2227
selector := req.GetSelector()
23-
2428
ctx := server.Context()
2529

30+
// attach server-side stream deadline to context
31+
if s.deadline != 0 {
32+
streamDeadline := time.Now().Add(s.deadline)
33+
deadlineCtx, cancel := context.WithDeadline(server.Context(), streamDeadline)
34+
ctx = deadlineCtx
35+
defer cancel()
36+
}
37+
2638
err := s.mux.Register(ctx, selector, muxPayload)
2739
if err != nil {
2840
return err
@@ -38,6 +50,11 @@ func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.F
3850
}
3951
case <-ctx.Done():
4052
s.mux.Unregister(ctx, selector)
53+
54+
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
55+
s.log.Debug(fmt.Sprintf("server-side deadline of %s exceeded, exiting stream request with grpc error code 4", s.deadline.String()))
56+
return status.Error(codes.DeadlineExceeded, "stream closed due to server-side timeout")
57+
}
4158
s.log.Debug("context complete and exiting stream request")
4259
return nil
4360
}

flagd/pkg/service/flag-sync/sync_service.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ type ISyncService interface {
2525
}
2626

2727
type SvcConfigurations struct {
28-
Logger *logger.Logger
29-
Port uint16
30-
Sources []string
31-
Store *store.State
32-
ContextValues map[string]any
33-
CertPath string
34-
KeyPath string
35-
SocketPath string
28+
Logger *logger.Logger
29+
Port uint16
30+
Sources []string
31+
Store *store.State
32+
ContextValues map[string]any
33+
CertPath string
34+
KeyPath string
35+
SocketPath string
36+
StreamDeadline time.Duration
3637
}
3738

3839
type Service struct {
@@ -84,6 +85,7 @@ func NewSyncService(cfg SvcConfigurations) (*Service, error) {
8485
mux: mux,
8586
log: l,
8687
contextValues: cfg.ContextValues,
88+
deadline: cfg.StreamDeadline,
8789
})
8890

8991
var lis net.Listener

0 commit comments

Comments
 (0)