From 4f4f16338f8ab9e37034d6bdb634821f0a93a270 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Sun, 7 Sep 2025 19:40:14 +0200 Subject: [PATCH 1/9] test: improve testing and fixing test state issues Signed-off-by: Simon Schrottner --- providers/flagd/e2e/config_test.go | 38 +++++--- tests/flagd/testframework/config_steps.go | 43 ++++----- tests/flagd/testframework/context_steps.go | 96 +++++++++++++------ tests/flagd/testframework/event_steps.go | 53 ++++++---- tests/flagd/testframework/flag_steps.go | 81 ++++++++++------ tests/flagd/testframework/provider_steps.go | 24 ++--- tests/flagd/testframework/step_definitions.go | 81 +++++----------- tests/flagd/testframework/testbed_runner.go | 40 -------- tests/flagd/testframework/utils.go | 61 +++++++++++- 9 files changed, 287 insertions(+), 230 deletions(-) diff --git a/providers/flagd/e2e/config_test.go b/providers/flagd/e2e/config_test.go index 4ef3b558b..c68262eb3 100644 --- a/providers/flagd/e2e/config_test.go +++ b/providers/flagd/e2e/config_test.go @@ -1,7 +1,7 @@ package e2e import ( - context2 "context" + "context" "testing" "github.com/cucumber/godog" @@ -40,23 +40,31 @@ func TestConfiguration(t *testing.T) { t.Run(tc.name, func(t *testing.T) { suite := godog.TestSuite{ Name: "flagd-config-" + tc.name, - ScenarioInitializer: func(context *godog.ScenarioContext) { - state := testframework.TestState{ - EnvVars: make(map[string]string), - EvalContext: make(map[string]interface{}), - EventChannel: make(chan testframework.EventRecord, 100), - } - testframework.InitializeConfigScenario(context, &state) - context.After(func(ctx context2.Context, sc *godog.Scenario, err error) (context2.Context, error) { - state.CleanupEnvironmentVariables() - return ctx, nil + ScenarioInitializer: func(sc *godog.ScenarioContext) { + + testframework.InitializeConfigScenario(sc) + sc.Before(func(ctx context.Context, sc *godog.Scenario) (context.Context, error) { + state := &testframework.TestState{ + EnvVars: make(map[string]string), + EvalContext: make(map[string]interface{}), + EventChannel: make(chan testframework.EventRecord, 100), + } + + return context.WithValue(ctx, testframework.TestStateKey{}, state), nil + }) + sc.After(func(ctx context.Context, sc *godog.Scenario, err error) (context.Context, error) { + if state, ok := ctx.Value(testframework.TestStateKey{}).(*testframework.TestState); ok { + state.CleanupEnvironmentVariables() + } + return ctx, err }) }, Options: &godog.Options{ - Format: "pretty", - Paths: []string{"../flagd-testbed/gherkin/config.feature"}, - Tags: tc.tags, - TestingT: t, + Format: "pretty", + Paths: []string{"../flagd-testbed/gherkin/config.feature"}, + Tags: tc.tags, + TestingT: t, + DefaultContext: context.Background(), }, } diff --git a/tests/flagd/testframework/config_steps.go b/tests/flagd/testframework/config_steps.go index 013995329..c24f4e8b0 100644 --- a/tests/flagd/testframework/config_steps.go +++ b/tests/flagd/testframework/config_steps.go @@ -12,10 +12,6 @@ import ( flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg" ) -// Type definitions moved to types.go - -// Context keys moved to types.go - // ignoredOptions a list of options that are currently not supported var ignoredOptions = []string{ "deadlineMs", @@ -28,33 +24,32 @@ var ignoredOptions = []string{ } // InitializeConfigScenario initializes the config test scenario -func InitializeConfigScenario(ctx *godog.ScenarioContext, state *TestState) { - ctx.Step(`^a config was initialized$`, state.aConfigWasInitialized) - ctx.Step(`^an environment variable "([^"]*)" with value "([^"]*)"$`, state.anEnvironmentVariableWithValue) - ctx.Step(`^an option "([^"]*)" of type "([^"]*)" with value "([^"]*)"$`, state.anOptionOfTypeWithValue) +func InitializeConfigScenario(ctx *godog.ScenarioContext) { + ctx.Step(`^a config was initialized$`, withStateNoArgs((*TestState).aConfigWasInitialized)) + ctx.Step(`^an environment variable "([^"]*)" with value "([^"]*)"$`, withState2Args((*TestState).anEnvironmentVariableWithValue)) + ctx.Step(`^an option "([^"]*)" of type "([^"]*)" with value "([^"]*)"$`, withState3Args((*TestState).anOptionOfTypeWithValue)) ctx.Step( `^the option "([^"]*)" of type "([^"]*)" should have the value "([^"]*)"$`, - state.theOptionOfTypeShouldHaveTheValue, + withState3ArgsReturningContext((*TestState).theOptionOfTypeShouldHaveTheValue), ) - ctx.Step(`^we should have an error$`, state.weShouldHaveAnError) + ctx.Step(`^we should have an error$`, withStateNoArgsReturningContext((*TestState).weShouldHaveAnError)) } -func (s *TestState) aConfigWasInitialized(ctx context.Context) { - opts := s.GenerateOpts() +// State methods - these now expect state as first parameter +func (s *TestState) aConfigWasInitialized(ctx context.Context) error { + opts := s.GenerateOpts() providerConfiguration, err := flagd.NewProviderConfiguration(opts) - s.ProviderConfig = ErrorAwareProviderConfiguration{ Configuration: providerConfiguration, Error: err, } + return nil } func (s *TestState) GenerateOpts() []flagd.ProviderOption { providerOptions := s.ProviderOptions - var opts []flagd.ProviderOption - for _, providerOption := range providerOptions { if !slices.Contains(ignoredOptions, providerOption.Option) { opts = append( @@ -66,25 +61,24 @@ func (s *TestState) GenerateOpts() []flagd.ProviderOption { return opts } -func (s *TestState) anEnvironmentVariableWithValue(key, value string) { - +func (s *TestState) anEnvironmentVariableWithValue(ctx context.Context, key, value string) error { s.EnvVars[key] = os.Getenv(key) err := os.Setenv(key, value) if err != nil { - panic(err) + return fmt.Errorf("failed to set environment variable %s: %w", key, err) } + return nil } -func (s *TestState) anOptionOfTypeWithValue(ctx context.Context, option, valueType, value string) { +func (s *TestState) anOptionOfTypeWithValue(ctx context.Context, option, valueType, value string) error { providerOptions := s.ProviderOptions - data := ProviderOption{ Option: option, ValueType: valueType, Value: value, } - s.ProviderOptions = append(providerOptions, data) + return nil } func (s *TestState) theOptionOfTypeShouldHaveTheValue( @@ -95,7 +89,6 @@ func (s *TestState) theOptionOfTypeShouldHaveTheValue( } errorAwareConfiguration := s.ProviderConfig - // gherkins null value needs to converted to an empty string if expectedValueS == "null" { expectedValueS = "" @@ -103,7 +96,6 @@ func (s *TestState) theOptionOfTypeShouldHaveTheValue( config := errorAwareConfiguration.Configuration currentValue := reflect.ValueOf(config).Elem().FieldByName(ToFieldName(option)) - converter := NewValueConverter() var expectedValue = converter.ConvertToReflectValue(valueType, expectedValueS, currentValue.Type()) @@ -115,18 +107,15 @@ func (s *TestState) theOptionOfTypeShouldHaveTheValue( fmt.Sprintf("%v", currentValue), ) } - return ctx, nil } func (s *TestState) weShouldHaveAnError(ctx context.Context) (context.Context, error) { errorAwareConfiguration := s.ProviderConfig - if errorAwareConfiguration.Error == nil { return ctx, errors.New("configuration check succeeded, but should not") - } else { - return ctx, nil } + return ctx, nil } func genericProviderOption(option, valueType, value string) flagd.ProviderOption { diff --git a/tests/flagd/testframework/context_steps.go b/tests/flagd/testframework/context_steps.go index fe27a5ef9..621cf81a6 100644 --- a/tests/flagd/testframework/context_steps.go +++ b/tests/flagd/testframework/context_steps.go @@ -1,24 +1,72 @@ package testframework import ( + "context" "fmt" - "github.com/cucumber/godog" ) // initializeContextSteps registers evaluation context step definitions -func initializeContextSteps(ctx *godog.ScenarioContext, state *TestState) { - ctx.Step(`^a context containing a key "([^"]*)", with type "([^"]*)" and with value "([^"]*)"$`, state.addContextValue) - ctx.Step(`^an empty context$`, state.clearContext) - ctx.Step(`^a context with the following keys:$`, state.addContextFromTable) +func InitializeContextSteps(ctx *godog.ScenarioContext) { + ctx.Step(`^a context containing a key "([^"]*)", with type "([^"]*)" and with value "([^"]*)"$`, + withState3Args((*TestState).addContextValue)) + ctx.Step(`^an empty context$`, + withStateNoArgs((*TestState).clearContext)) + ctx.Step(`^a context with the following keys:$`, + withStateTable((*TestState).addContextFromTable)) // Missing step definitions - added as stubs - ctx.Step(`^a context containing a nested property with outer key "([^"]*)" and inner key "([^"]*)", with value "([^"]*)"$`, state.addNestedContextProperty) - ctx.Step(`^a context containing a targeting key with value "([^"]*)"$`, state.addTargetingKeyToContext) + ctx.Step(`^a context containing a nested property with outer key "([^"]*)" and inner key "([^"]*)", with value "([^"]*)"$`, + withState3Args((*TestState).addNestedContextProperty)) + ctx.Step(`^a context containing a targeting key with value "([^"]*)"$`, + withState1Arg((*TestState).addTargetingKeyToContext)) + + // Context validation steps + ctx.Step(`^the context should contain key "([^"]*)"$`, + withState1Arg((*TestState).contextContainsKeyStep)) + ctx.Step(`^the context should be empty$`, + withStateNoArgs((*TestState).contextIsEmptyStep)) + ctx.Step(`^the context should have (\d+) keys?$`, + withStateIntArg((*TestState).contextShouldHaveKeysStep)) + ctx.Step(`^the context value for "([^"]*)" should be "([^"]*)" of type "([^"]*)"$`, + withState3Args((*TestState).contextValueShouldBeStep)) +} + +// Additional helper functions for different signatures +func withState1Arg(fn func(*TestState, context.Context, string) error) func(context.Context, string) error { + return func(ctx context.Context, arg1 string) error { + state := GetStateFromContext(ctx) + if state == nil { + return fmt.Errorf("test state not found in context") + } + return fn(state, ctx, arg1) + } +} + +func withStateTable(fn func(*TestState, context.Context, *godog.Table) error) func(context.Context, *godog.Table) error { + return func(ctx context.Context, table *godog.Table) error { + state := GetStateFromContext(ctx) + if state == nil { + return fmt.Errorf("test state not found in context") + } + return fn(state, ctx, table) + } } +func withStateIntArg(fn func(*TestState, context.Context, int) error) func(context.Context, int) error { + return func(ctx context.Context, arg1 int) error { + state := GetStateFromContext(ctx) + if state == nil { + return fmt.Errorf("test state not found in context") + } + return fn(state, ctx, arg1) + } +} + +// State methods - these now expect context as first parameter after state + // addContextValue adds a typed value to the evaluation context -func (s *TestState) addContextValue(key, valueType, value string) error { +func (s *TestState) addContextValue(ctx context.Context, key, valueType, value string) error { convertedValue, err := convertValueForSteps(value, valueType) if err != nil { return fmt.Errorf("failed to convert context value for key %s: %w", key, err) @@ -29,13 +77,13 @@ func (s *TestState) addContextValue(key, valueType, value string) error { } // clearContext removes all values from the evaluation context -func (s *TestState) clearContext() error { +func (s *TestState) clearContext(ctx context.Context) error { s.EvalContext = make(map[string]interface{}) return nil } // addContextFromTable adds multiple context values from a Gherkin data table -func (s *TestState) addContextFromTable(table *godog.Table) error { +func (s *TestState) addContextFromTable(ctx context.Context, table *godog.Table) error { if len(table.Rows) < 2 { return fmt.Errorf("table must have at least header row and one data row") } @@ -72,7 +120,7 @@ func (s *TestState) addContextFromTable(table *godog.Table) error { valueType := row.Cells[typeCol].Value value := row.Cells[valueCol].Value - if err := s.addContextValue(key, valueType, value); err != nil { + if err := s.addContextValue(ctx, key, valueType, value); err != nil { return err } } @@ -138,28 +186,20 @@ func (s *TestState) contextIsEmpty() error { return nil } -// Additional step definitions for context validation - -// registerContextValidationSteps adds validation steps for evaluation context -func (s *TestState) registerContextValidationSteps(ctx *godog.ScenarioContext) { - ctx.Step(`^the context should contain key "([^"]*)"$`, s.contextContainsKeyStep) - ctx.Step(`^the context should be empty$`, s.contextIsEmptyStep) - ctx.Step(`^the context should have (\d+) keys?$`, s.contextShouldHaveKeysStep) - ctx.Step(`^the context value for "([^"]*)" should be "([^"]*)" of type "([^"]*)"$`, s.contextValueShouldBeStep) -} +// Step definition wrappers // contextContainsKeyStep is a step definition wrapper -func (s *TestState) contextContainsKeyStep(key string) error { +func (s *TestState) contextContainsKeyStep(ctx context.Context, key string) error { return s.contextContainsKey(key) } // contextIsEmptyStep is a step definition wrapper -func (s *TestState) contextIsEmptyStep() error { +func (s *TestState) contextIsEmptyStep(ctx context.Context) error { return s.contextIsEmpty() } // contextShouldHaveKeysStep checks the number of keys in context -func (s *TestState) contextShouldHaveKeysStep(expectedCount int) error { +func (s *TestState) contextShouldHaveKeysStep(ctx context.Context, expectedCount int) error { actualCount := s.contextSize() if actualCount != expectedCount { return fmt.Errorf("expected context to have %d keys, but it has %d", expectedCount, actualCount) @@ -168,7 +208,7 @@ func (s *TestState) contextShouldHaveKeysStep(expectedCount int) error { } // contextValueShouldBeStep checks a specific context value -func (s *TestState) contextValueShouldBeStep(key, expectedValue, valueType string) error { +func (s *TestState) contextValueShouldBeStep(ctx context.Context, key, expectedValue, valueType string) error { convertedExpected, err := convertValueForSteps(expectedValue, valueType) if err != nil { return fmt.Errorf("failed to convert expected value: %w", err) @@ -177,15 +217,15 @@ func (s *TestState) contextValueShouldBeStep(key, expectedValue, valueType strin return s.contextValueEquals(key, convertedExpected) } -// Missing step definition implementations - added as stubs that throw errors +// Missing step definition implementations // addNestedContextProperty adds a nested property to evaluation context -func (s *TestState) addNestedContextProperty(outerKey, innerKey, value string) error { - return s.addContextValue(outerKey, "Object", fmt.Sprintf("{\"%s\": \"%s\"}", innerKey, value)) +func (s *TestState) addNestedContextProperty(ctx context.Context, outerKey, innerKey, value string) error { + return s.addContextValue(ctx, outerKey, "Object", fmt.Sprintf("{\"%s\": \"%s\"}", innerKey, value)) } // addTargetingKeyToContext adds a targeting key to evaluation context -func (s *TestState) addTargetingKeyToContext(value string) error { +func (s *TestState) addTargetingKeyToContext(ctx context.Context, value string) error { s.TargetingKey = value return nil } diff --git a/tests/flagd/testframework/event_steps.go b/tests/flagd/testframework/event_steps.go index cd809f0cd..c428192d0 100644 --- a/tests/flagd/testframework/event_steps.go +++ b/tests/flagd/testframework/event_steps.go @@ -1,6 +1,7 @@ package testframework import ( + "context" "fmt" "strings" "time" @@ -9,24 +10,40 @@ import ( "github.com/open-feature/go-sdk/openfeature" ) -// initializeEventSteps registers event handling step definitions -func initializeEventSteps(ctx *godog.ScenarioContext, state *TestState) { +// InitializeEventSteps registers event handling step definitions +func InitializeEventSteps(ctx *godog.ScenarioContext) { // Specific event handlers that have custom logic - ctx.Step(`^the flag should be part of the event payload$`, state.assertFlagInChangeEvent) + ctx.Step(`^the flag should be part of the event payload$`, + withStateNoArgs((*TestState).assertFlagInChangeEvent)) // Generic wildcard event handler patterns - future-proof - ctx.Step(`^a (\w+) event handler$`, state.addGenericEventHandler) - ctx.Step(`^a (\w+) event was fired$`, state.waitForGenericEvent) - ctx.Step(`^the (\w+) event handler should have been executed$`, state.assertGenericEventExecuted) + ctx.Step(`^a (\w+) event handler$`, + withState1Arg((*TestState).addGenericEventHandler)) + ctx.Step(`^a (\w+) event was fired$`, + withState1Arg((*TestState).waitForGenericEvent)) + ctx.Step(`^the (\w+) event handler should have been executed$`, + withState1Arg((*TestState).assertGenericEventExecuted)) // Missing step definition - added as stub - ctx.Step(`^the (\w+) event handler should have been executed within (\d+)ms$`, state.assertGenericEventExecutedWithin) + ctx.Step(`^the (\w+) event handler should have been executed within (\d+)ms$`, + withStateStringAndInt((*TestState).assertGenericEventExecutedWithin)) } -// Specific event handlers consolidated into generic handlers above +// Additional helper for string + int arguments +func withStateStringAndInt(fn func(*TestState, context.Context, string, int) error) func(context.Context, string, int) error { + return func(ctx context.Context, arg1 string, arg2 int) error { + state := GetStateFromContext(ctx) + if state == nil { + return fmt.Errorf("test state not found in context") + } + return fn(state, ctx, arg1, arg2) + } +} + +// State methods - these now expect context as first parameter after state // assertFlagInChangeEvent verifies that the current flag is in the change event payload -func (s *TestState) assertFlagInChangeEvent() error { +func (s *TestState) assertFlagInChangeEvent(ctx context.Context) error { if s.FlagKey == "" { return fmt.Errorf("no flag key set for verification") } @@ -60,7 +77,7 @@ func (s *TestState) assertFlagInChangeEvent() error { func (s *TestState) clearEvents() { // Clear the last event s.LastEvent = nil - + // Drain the channel for { select { @@ -73,11 +90,10 @@ func (s *TestState) clearEvents() { } } - // Generic event handler functions - consolidated and future-proof // addGenericEventHandler adds a handler for any event type -func (s *TestState) addGenericEventHandler(eventType string) error { +func (s *TestState) addGenericEventHandler(ctx context.Context, eventType string) error { if s.Client == nil { return fmt.Errorf("no client available to add %s event handler", eventType) } @@ -106,13 +122,13 @@ func (s *TestState) addGenericEventHandler(eventType string) error { } // waitForGenericEvent waits for any event type to be fired -func (s *TestState) waitForGenericEvent(eventType string) error { +func (s *TestState) waitForGenericEvent(ctx context.Context, eventType string) error { timeout := 5 * time.Second return s.waitForEvents(strings.ToUpper(eventType), timeout) } // assertGenericEventExecuted verifies that any event type was received -func (s *TestState) assertGenericEventExecuted(eventType string) error { +func (s *TestState) assertGenericEventExecuted(ctx context.Context, eventType string) error { return s.assertEventOccurred(strings.ToUpper(eventType)) } @@ -123,11 +139,8 @@ func (s *TestState) handleProviderStateChange(eventType string) func(openfeature } } - -// Missing step definition implementation - added as stub that throws error - // assertGenericEventExecutedWithin checks if any event was executed within specified time -func (s *TestState) assertGenericEventExecutedWithin(eventType string, timeoutMs int) error { +func (s *TestState) assertGenericEventExecutedWithin(ctx context.Context, eventType string, timeoutMs int) error { timeout := time.Duration(timeoutMs) * time.Millisecond return s.waitForEvents(strings.ToUpper(eventType), timeout) } @@ -141,7 +154,7 @@ func (s *TestState) addEvent(eventType string, details openfeature.EventDetails) Timestamp: time.Now(), Details: details, } - + // Send to channel for immediate notification (non-blocking) select { case s.EventChannel <- event: @@ -172,7 +185,7 @@ func (s *TestState) waitForEvents(eventType string, maxWait time.Duration) error // assertEventOccurred checks if a specific event occurred (with immediate timeout) func (s *TestState) assertEventOccurred(eventType string) error { - return s.waitForEvents(eventType, 100*time.Millisecond) + return s.waitForEvents(eventType, 10*time.Second) } // waitForEventWithPayload waits for a specific event type and validates its payload diff --git a/tests/flagd/testframework/flag_steps.go b/tests/flagd/testframework/flag_steps.go index 47efad340..99018b904 100644 --- a/tests/flagd/testframework/flag_steps.go +++ b/tests/flagd/testframework/flag_steps.go @@ -9,26 +9,51 @@ import ( "github.com/open-feature/go-sdk/openfeature" ) -// initializeFlagSteps registers flag evaluation step definitions -func initializeFlagSteps(ctx *godog.ScenarioContext, state *TestState) { - ctx.Step(`^a ([^-]*)-flag with key "([^"]*)" and a default value "([^"]*)"$`, state.setFlagForEvaluation) - ctx.Step(`^the flag was evaluated with details$`, state.evaluateFlagWithDetails) - ctx.Step(`^the resolved details value should be "([^"]*)"$`, state.assertResolvedValue) - ctx.Step(`^the reason should be "([^"]*)"$`, state.assertReason) - ctx.Step(`^the error-code should be "([^"]*)"$`, state.assertErrorCode) - ctx.Step(`^the flag should be part of the event payload$`, state.assertFlagInEventPayload) - ctx.Step(`^the flag was modified$`, state.modifyFlag) - ctx.Step(`^a change event was fired$`, state.triggerChangeEvent) - ctx.Step(`^the variant should be "([^"]*)"$`, state.assertVariant) - ctx.Step(`^the resolved details value should be "{"([^"]*)": true, "([^"]*)": "([^"]*)", "([^"]*)": (\d+)\.(\d+) }"$`, state.assertComplexValue) +// InitializeFlagSteps registers flag evaluation step definitions +func InitializeFlagSteps(ctx *godog.ScenarioContext) { + ctx.Step(`^a ([^-]*)-flag with key "([^"]*)" and a default value "([^"]*)"$`, + withState3Args((*TestState).setFlagForEvaluation)) + ctx.Step(`^the flag was evaluated with details$`, + withStateNoArgs((*TestState).evaluateFlagWithDetails)) + ctx.Step(`^the resolved details value should be "([^"]*)"$`, + withState1Arg((*TestState).assertResolvedValue)) + ctx.Step(`^the reason should be "([^"]*)"$`, + withState1Arg((*TestState).assertReason)) + ctx.Step(`^the error-code should be "([^"]*)"$`, + withState1Arg((*TestState).assertErrorCode)) + ctx.Step(`^the flag should be part of the event payload$`, + withStateNoArgs((*TestState).assertFlagInEventPayload)) + ctx.Step(`^the flag was modified$`, + withStateNoArgs((*TestState).modifyFlag)) + ctx.Step(`^a change event was fired$`, + withStateNoArgs((*TestState).triggerChangeEvent)) + ctx.Step(`^the variant should be "([^"]*)"$`, + withState1Arg((*TestState).assertVariant)) + ctx.Step(`^the resolved details value should be "{"([^"]*)": true, "([^"]*)": "([^"]*)", "([^"]*)": (\d+)\.(\d+) }"$`, + withStateComplexValue((*TestState).assertComplexValue)) // Missing step definitions - added as stubs - ctx.Step(`^the resolved metadata is empty$`, state.assertResolvedMetadataIsEmpty) - ctx.Step(`^the resolved metadata should contain$`, state.assertResolvedMetadataContains) + ctx.Step(`^the resolved metadata is empty$`, + withStateNoArgs((*TestState).assertResolvedMetadataIsEmpty)) + ctx.Step(`^the resolved metadata should contain$`, + withStateTable((*TestState).assertResolvedMetadataContains)) } +// Additional helper for complex value assertion +func withStateComplexValue(fn func(*TestState, context.Context, string, string, string, string, int, int) error) func(context.Context, string, string, string, string, int, int) error { + return func(ctx context.Context, key1, key2, value2, key3 string, intPart, fracPart int) error { + state := GetStateFromContext(ctx) + if state == nil { + return fmt.Errorf("test state not found in context") + } + return fn(state, ctx, key1, key2, value2, key3, intPart, fracPart) + } +} + +// State methods - these now expect context as first parameter after state + // setFlagForEvaluation prepares a flag for evaluation -func (s *TestState) setFlagForEvaluation(flagType, flagKey, defaultValue string) error { +func (s *TestState) setFlagForEvaluation(ctx context.Context, flagType, flagKey, defaultValue string) error { s.FlagType = flagType s.FlagKey = flagKey @@ -48,7 +73,7 @@ func (s *TestState) convertDefaultValue(flagType, value string) (interface{}, er } // evaluateFlagWithDetails evaluates the current flag with details -func (s *TestState) evaluateFlagWithDetails() error { +func (s *TestState) evaluateFlagWithDetails(ctx context.Context) error { if s.Client == nil { return fmt.Errorf("no client available for evaluation") } @@ -61,8 +86,6 @@ func (s *TestState) evaluateFlagWithDetails() error { evalCtx := openfeature.NewEvaluationContext(s.TargetingKey, s.EvalContext) // Evaluate based on flag type - ctx := context.Background() - switch s.FlagType { case "Boolean": if defaultVal, ok := s.DefaultValue.(bool); ok { @@ -143,7 +166,7 @@ func (s *TestState) evaluateFlagWithDetails() error { } // assertResolvedValue checks that the resolved value matches expected -func (s *TestState) assertResolvedValue(expectedValue string) error { +func (s *TestState) assertResolvedValue(ctx context.Context, expectedValue string) error { if s.LastEvaluation.FlagKey == "" { return fmt.Errorf("no evaluation details available") } @@ -172,7 +195,7 @@ func (s *TestState) assertResolvedValue(expectedValue string) error { } // assertReason checks that the evaluation reason matches expected -func (s *TestState) assertReason(expectedReason string) error { +func (s *TestState) assertReason(ctx context.Context, expectedReason string) error { if s.LastEvaluation.FlagKey == "" { return fmt.Errorf("no evaluation details available") } @@ -186,7 +209,7 @@ func (s *TestState) assertReason(expectedReason string) error { } // assertErrorCode checks that the error code matches expected -func (s *TestState) assertErrorCode(expectedCode string) error { +func (s *TestState) assertErrorCode(ctx context.Context, expectedCode string) error { if s.LastEvaluation.FlagKey == "" { return fmt.Errorf("no evaluation details available") } @@ -207,13 +230,13 @@ func (s *TestState) assertErrorCode(expectedCode string) error { } // assertFlagInEventPayload checks that the current flag is in the latest change event -func (s *TestState) assertFlagInEventPayload() error { +func (s *TestState) assertFlagInEventPayload(ctx context.Context) error { // Use the improved channel-based implementation from event_steps.go - return s.assertFlagInChangeEvent() + return s.assertFlagInChangeEvent(ctx) } // modifyFlag modifies the current flag (typically by calling testbed API) -func (s *TestState) modifyFlag() error { +func (s *TestState) modifyFlag(ctx context.Context) error { if s.Container == nil { return fmt.Errorf("no container available to modify flags") } @@ -227,7 +250,7 @@ func (s *TestState) modifyFlag() error { } // triggerChangeEvent triggers a flag change event -func (s *TestState) triggerChangeEvent() error { +func (s *TestState) triggerChangeEvent(ctx context.Context) error { // Add change event handler handler := func(details openfeature.EventDetails) { s.addEvent("CONFIGURATION_CHANGE", details) @@ -332,7 +355,7 @@ func (s *TestState) evaluateFloatFlag(flagKey string, defaultValue float64, eval } // assertVariant checks that the evaluation result has the expected variant -func (s *TestState) assertVariant(expectedVariant string) error { +func (s *TestState) assertVariant(ctx context.Context, expectedVariant string) error { if s.LastEvaluation.Variant != expectedVariant { return fmt.Errorf("expected variant %s, got %s", expectedVariant, s.LastEvaluation.Variant) } @@ -340,7 +363,7 @@ func (s *TestState) assertVariant(expectedVariant string) error { } // assertComplexValue checks a complex object value with specific structure -func (s *TestState) assertComplexValue(key1 string, key2 string, value2 string, key3 string, intPart int, fracPart int) error { +func (s *TestState) assertComplexValue(ctx context.Context, key1 string, key2 string, value2 string, key3 string, intPart int, fracPart int) error { // For now, this is a placeholder that always passes // In a real implementation, you'd parse the JSON object from the evaluation result return nil @@ -349,12 +372,12 @@ func (s *TestState) assertComplexValue(key1 string, key2 string, value2 string, // Missing step definition implementations - added as stubs that throw errors // assertResolvedMetadataIsEmpty checks if resolved metadata is empty -func (s *TestState) assertResolvedMetadataIsEmpty() error { +func (s *TestState) assertResolvedMetadataIsEmpty(ctx context.Context) error { return fmt.Errorf("UNIMPLEMENTED: assertResolvedMetadataIsEmpty") } // assertResolvedMetadataContains checks if resolved metadata contains specific values -func (s *TestState) assertResolvedMetadataContains(table *godog.Table) error { +func (s *TestState) assertResolvedMetadataContains(ctx context.Context, table *godog.Table) error { if len(table.Rows) < 2 { return fmt.Errorf("table must have at least header row and one data row") } diff --git a/tests/flagd/testframework/provider_steps.go b/tests/flagd/testframework/provider_steps.go index fdbc9a7bf..2a500b204 100644 --- a/tests/flagd/testframework/provider_steps.go +++ b/tests/flagd/testframework/provider_steps.go @@ -1,6 +1,7 @@ package testframework import ( + "context" "fmt" "strings" "time" @@ -18,14 +19,18 @@ func SetProviderSuppliers(rpc, inProcess, file ProviderSupplier) { FileProviderSupplier = file } -// initializeProviderSteps registers provider lifecycle step definitions -func initializeProviderSteps(ctx *godog.ScenarioContext, state *TestState) { - ctx.Step(`^the connection is lost for (\d+)s$`, state.simulateConnectionLoss) +// InitializeProviderSteps registers provider lifecycle step definitions +func InitializeProviderSteps(ctx *godog.ScenarioContext) { + ctx.Step(`^the connection is lost for (\d+)s$`, + withStateIntArg((*TestState).simulateConnectionLoss)) // Generic provider step definition - accepts any provider type including "stable" - ctx.Step(`^a (\w+) flagd provider$`, state.createSpecializedFlagdProvider) + ctx.Step(`^a (\w+) flagd provider$`, + withState1Arg((*TestState).createSpecializedFlagdProvider)) } +// State methods - these now expect context as first parameter after state + // createProviderInstance creates and initializes a flagd provider (formerly createStableFlagdProvider) func (s *TestState) createProviderInstance() error { // Apply defaults if not set @@ -83,7 +88,7 @@ func (s *TestState) waitForProviderReady(timeout time.Duration) error { } // Use generic event handler infrastructure - if err := s.addGenericEventHandler("ready"); err != nil { + if err := s.addGenericEventHandler(context.Background(), "ready"); err != nil { return fmt.Errorf("failed to add ready event handler: %w", err) } @@ -91,10 +96,8 @@ func (s *TestState) waitForProviderReady(timeout time.Duration) error { return s.waitForEvents("READY", timeout) } -// addReadyEventHandler adds a handler for provider ready events - // simulateConnectionLoss simulates connection loss for specified duration -func (s *TestState) simulateConnectionLoss(seconds int) error { +func (s *TestState) simulateConnectionLoss(ctx context.Context, seconds int) error { if s.Container == nil { return fmt.Errorf("no container available to simulate connection loss") } @@ -103,10 +106,8 @@ func (s *TestState) simulateConnectionLoss(seconds int) error { return s.Container.Restart(seconds) } -// Missing step definition implementation - normalized with placeholder - // createSpecializedFlagdProvider creates specialized flagd providers based on type -func (s *TestState) createSpecializedFlagdProvider(providerType string) error { +func (s *TestState) createSpecializedFlagdProvider(ctx context.Context, providerType string) error { // Apply specialized configuration based on provider type if err := s.applySpecializedConfig(providerType); err != nil { return fmt.Errorf("failed to apply specialized config for %s provider: %w", providerType, err) @@ -171,7 +172,6 @@ func (s *TestState) configureUnavailableProvider() error { func (s *TestState) configureSocketProvider() error { // Configure for unix socket connection - s.addProviderOption("socketPath", "String", "/tmp/flagd.sock") s.addProviderOption("port", "Integer", "0") // Disable port when using socket return nil diff --git a/tests/flagd/testframework/step_definitions.go b/tests/flagd/testframework/step_definitions.go index a746b90a4..f6a220d0d 100644 --- a/tests/flagd/testframework/step_definitions.go +++ b/tests/flagd/testframework/step_definitions.go @@ -2,87 +2,59 @@ package testframework import ( "context" - "os" - "github.com/cucumber/godog" + "os" + "sync" ) // All type definitions have been moved to types.go for better organization +var scenarioMutex sync.Mutex // InitializeScenario registers all step definitions for gherkin scenarios func InitializeScenario(ctx *godog.ScenarioContext) { - state := &TestState{ - EnvVars: make(map[string]string), - EvalContext: make(map[string]interface{}), - EventChannel: make(chan EventRecord, 100), - } // Configuration steps (existing config_steps.go steps work fine with TestState via context) - InitializeConfigScenario(ctx, state) + InitializeConfigScenario(ctx) // Provider lifecycle steps - initializeProviderSteps(ctx, state) + InitializeProviderSteps(ctx) // Flag evaluation steps - initializeFlagSteps(ctx, state) + InitializeFlagSteps(ctx) // Context management steps - initializeContextSteps(ctx, state) + InitializeContextSteps(ctx) // Event handling steps - initializeEventSteps(ctx, state) + InitializeEventSteps(ctx) // Setup scenario hooks ctx.Before(func(ctx context.Context, sc *godog.Scenario) (context.Context, error) { - // Reset state for each scenario - state.resetState() + scenarioMutex.Lock() + defer scenarioMutex.Unlock() + state := &TestState{ + EnvVars: make(map[string]string), + EvalContext: make(map[string]interface{}), + EventChannel: make(chan EventRecord, 100), + } state.ProviderType = ctx.Value("resolver").(ProviderType) state.FlagDir = ctx.Value("flagDir").(string) - // Store state in context for steps that need it + return context.WithValue(ctx, TestStateKey{}, state), nil }) ctx.After(func(ctx context.Context, sc *godog.Scenario, err error) (context.Context, error) { - // Clean up per-scenario state, but keep the container running - state.CleanupEnvironmentVariables() - - // Properly cleanup provider and client to prevent event contamination - state.cleanupProvider() - - // Clear events after provider cleanup to ensure no residual events - state.clearEvents() - - // NOTE: We do NOT stop the container here - it should run for the entire test suite + scenarioMutex.Lock() + defer scenarioMutex.Unlock() + if state, ok := ctx.Value(TestStateKey{}).(*TestState); ok { + state.CleanupEnvironmentVariables() + state.cleanupProvider() + state.clearEvents() + } return ctx, nil }) } -// resetState clears test state between scenarios -func (s *TestState) resetState() { - s.EnvVars = make(map[string]string) - s.LastEvaluation = EvaluationResult{} - s.EvalContext = make(map[string]interface{}) - s.TargetingKey = "" - s.ConfigError = nil - s.FlagKey = "" - s.FlagType = "" - s.DefaultValue = nil - - // Reset config state - s.ProviderOptions = []ProviderOption{} - s.ProviderConfig = ErrorAwareProviderConfiguration{} - - // Create a fresh event channel for this scenario - // This ensures no events from previous scenarios leak through - s.EventChannel = make(chan EventRecord, 100) - s.LastEvent = nil - - // Note: Provider and client cleanup is handled in the After hook - // to ensure proper shutdown sequencing - s.Client = nil - s.Provider = nil -} - // Type conversion utilities are now centralized in utils.go // Legacy compatibility wrappers func convertValueForSteps(value string, valueType string) (interface{}, error) { @@ -119,13 +91,6 @@ func (s *TestState) CleanupEnvironmentVariables() { // cleanupProvider properly shuts down the provider and client to prevent event contamination func (s *TestState) cleanupProvider() { - // Remove all event handlers from client to prevent lingering events - if s.Client != nil { - // Note: OpenFeature Go SDK doesn't have a RemoveAllHandlers method, - // but setting the client to nil will prevent further event handling - s.Client = nil - } - // Shutdown the provider if it has a shutdown method if s.Provider != nil { // Try to cast to common provider interfaces that might have shutdown methods diff --git a/tests/flagd/testframework/testbed_runner.go b/tests/flagd/testframework/testbed_runner.go index 586b49275..fcb07ec83 100644 --- a/tests/flagd/testframework/testbed_runner.go +++ b/tests/flagd/testframework/testbed_runner.go @@ -143,46 +143,6 @@ func (tr *TestbedRunner) SetupContainer(ctx context.Context) error { return nil } -// RunGherkinTests executes gherkin tests against the testbed -func (tr *TestbedRunner) RunGherkinTests(featurePaths []string, tags string) error { - if tr.container == nil { - return fmt.Errorf("container not initialized") - } - - // Setup provider suppliers for the integration package - SetProviderSuppliers( - tr.createRPCProviderSupplier(), - tr.createInProcessProviderSupplier(), - tr.createFileProviderSupplier(), - ) - - for i, path := range featurePaths { - featurePaths[i] = filepath.Join(tr.testbedDir, path) - } - // Configure godog - opts := godog.Options{ - Format: "pretty", - Paths: featurePaths, - Tags: tags, - Concurrency: 1, - } - - // Create test suite - suite := godog.TestSuite{ - Name: "flagd-e2e", - ScenarioInitializer: tr.initializeScenario, - Options: &opts, - } - - // Run tests - status := suite.Run() - if status != 0 { - return fmt.Errorf("tests failed with status: %d", status) - } - - return nil -} - // RunGherkinTestsWithSubtests executes gherkin tests with individual Go subtests for each scenario // This makes each Gherkin scenario appear as a separate test in IntelliJ func (tr *TestbedRunner) RunGherkinTestsWithSubtests(t *testing.T, featurePaths []string, tags string) error { diff --git a/tests/flagd/testframework/utils.go b/tests/flagd/testframework/utils.go index 2814fc8f9..1b5a7bfa6 100644 --- a/tests/flagd/testframework/utils.go +++ b/tests/flagd/testframework/utils.go @@ -1,6 +1,7 @@ package testframework import ( + "context" "encoding/json" "fmt" "reflect" @@ -168,4 +169,62 @@ func StringToBoolean(str string) bool { } // Global converter instance -var DefaultConverter = NewValueConverter() \ No newline at end of file +var DefaultConverter = NewValueConverter() + +// Helper functions for wrapping state methods +func withStateNoArgs(fn func(*TestState, context.Context) error) func(context.Context) error { + return func(ctx context.Context) error { + state := GetStateFromContext(ctx) + if state == nil { + return fmt.Errorf("test state not found in context") + } + return fn(state, ctx) + } +} + +func withState2Args(fn func(*TestState, context.Context, string, string) error) func(context.Context, string, string) error { + return func(ctx context.Context, arg1, arg2 string) error { + state := GetStateFromContext(ctx) + if state == nil { + return fmt.Errorf("test state not found in context") + } + return fn(state, ctx, arg1, arg2) + } +} + +func withState3Args(fn func(*TestState, context.Context, string, string, string) error) func(context.Context, string, string, string) error { + return func(ctx context.Context, arg1, arg2, arg3 string) error { + state := GetStateFromContext(ctx) + if state == nil { + return fmt.Errorf("test state not found in context") + } + return fn(state, ctx, arg1, arg2, arg3) + } +} + +func withState3ArgsReturningContext(fn func(*TestState, context.Context, string, string, string) (context.Context, error)) func(context.Context, string, string, string) (context.Context, error) { + return func(ctx context.Context, arg1, arg2, arg3 string) (context.Context, error) { + state := GetStateFromContext(ctx) + if state == nil { + return ctx, fmt.Errorf("test state not found in context") + } + return fn(state, ctx, arg1, arg2, arg3) + } +} + +func withStateNoArgsReturningContext(fn func(*TestState, context.Context) (context.Context, error)) func(context.Context) (context.Context, error) { + return func(ctx context.Context) (context.Context, error) { + state := GetStateFromContext(ctx) + if state == nil { + return ctx, fmt.Errorf("test state not found in context") + } + return fn(state, ctx) + } +} + +func GetStateFromContext(ctx context.Context) *TestState { + if state, ok := ctx.Value(TestStateKey{}).(*TestState); ok { + return state + } + return nil +} From b6cd81238d77f858630bb7c74edd29090596b269 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Sun, 7 Sep 2025 20:07:18 +0200 Subject: [PATCH 2/9] feat(flagd): add eventing with graceperiod fir inprocess resolver Signed-off-by: Simon Schrottner --- providers/flagd/e2e/inprocess_test.go | 2 +- providers/flagd/pkg/configuration.go | 20 +- .../flagd/pkg/service/in_process/grpc_sync.go | 247 ++++++++++++++++++ .../flagd/pkg/service/in_process/service.go | 66 ++++- tests/flagd/testframework/event_steps.go | 2 +- tests/flagd/testframework/provider_steps.go | 6 +- 6 files changed, 331 insertions(+), 12 deletions(-) create mode 100644 providers/flagd/pkg/service/in_process/grpc_sync.go diff --git a/providers/flagd/e2e/inprocess_test.go b/providers/flagd/e2e/inprocess_test.go index 2b0f5a569..6dade95b0 100644 --- a/providers/flagd/e2e/inprocess_test.go +++ b/providers/flagd/e2e/inprocess_test.go @@ -26,7 +26,7 @@ func TestInProcessProviderE2E(t *testing.T) { } // Run tests with in-process specific tags - exclude connection/event issues we won't tackle - tags := "@in-process && ~@unixsocket && ~@targetURI && ~@metadata && ~@grace && ~@customCert && ~@reconnect && ~@contextEnrichment && ~@sync-payload && ~@events" + tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload" if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil { t.Fatalf("Gherkin tests failed: %v", err) diff --git a/providers/flagd/pkg/configuration.go b/providers/flagd/pkg/configuration.go index e41ab4132..60bfde622 100644 --- a/providers/flagd/pkg/configuration.go +++ b/providers/flagd/pkg/configuration.go @@ -3,15 +3,15 @@ package flagd import ( "errors" "fmt" - "os" - "strconv" - "strings" - "github.com/go-logr/logr" "github.com/open-feature/flagd/core/pkg/sync" "github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache" "github.com/open-feature/go-sdk-contrib/providers/flagd/internal/logger" "google.golang.org/grpc" + "os" + "strconv" + "strings" + "time" ) type ResolverType string @@ -26,6 +26,7 @@ const ( defaultCache = cache.LRUValue defaultHost = "localhost" defaultResolver = rpc + defaultGracePeriod = 5 * time.Second rpc ResolverType = "rpc" inProcess ResolverType = "in-process" @@ -44,6 +45,7 @@ const ( flagdSourceSelectorEnvironmentVariableName = "FLAGD_SOURCE_SELECTOR" flagdOfflinePathEnvironmentVariableName = "FLAGD_OFFLINE_FLAG_SOURCE_PATH" flagdTargetUriEnvironmentVariableName = "FLAGD_TARGET_URI" + flagdGracePeriodVariableName = "FLAGD_GRACE_PERIOD" ) type ProviderConfiguration struct { @@ -64,6 +66,7 @@ type ProviderConfiguration struct { CustomSyncProvider sync.ISync CustomSyncProviderUri string GrpcDialOptionsOverride []grpc.DialOption + GracePeriod time.Duration log logr.Logger } @@ -77,6 +80,7 @@ func newDefaultConfiguration(log logr.Logger) *ProviderConfiguration { MaxCacheSize: defaultMaxCacheSize, Resolver: defaultResolver, Tls: defaultTLS, + GracePeriod: defaultGracePeriod, } p.updateFromEnvVar() @@ -224,6 +228,14 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() { if targetUri := os.Getenv(flagdTargetUriEnvironmentVariableName); targetUri != "" { cfg.TargetUri = targetUri } + if gracePeriod := os.Getenv(flagdGracePeriodVariableName); gracePeriod != "" { + if seconds, err := strconv.Atoi(gracePeriod); err == nil { + cfg.GracePeriod = time.Duration(seconds) * time.Second + } else { + // Handle parsing error + cfg.log.Error(err, fmt.Sprintf("invalid grace period '%s': %v", gracePeriod, err)) + } + } } diff --git a/providers/flagd/pkg/service/in_process/grpc_sync.go b/providers/flagd/pkg/service/in_process/grpc_sync.go new file mode 100644 index 000000000..a9ccaf6e1 --- /dev/null +++ b/providers/flagd/pkg/service/in_process/grpc_sync.go @@ -0,0 +1,247 @@ +package process + +import ( + "buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc" + v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1" + "context" + "fmt" + "github.com/open-feature/flagd/core/pkg/logger" + "github.com/open-feature/flagd/core/pkg/sync" + grpccredential "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials" + of "github.com/open-feature/go-sdk/openfeature" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/keepalive" + msync "sync" + "time" +) + +const ( + // Prefix for GRPC URL inputs. GRPC does not define a standard prefix. This prefix helps to differentiate remote + // URLs for REST APIs (i.e - HTTP) from GRPC endpoints. + Prefix = "grpc://" + PrefixSecure = "grpcs://" + SupportedScheme = "(envoy|dns|uds|xds)" +) + +// type aliases for interfaces required by this component - needed for mock generation with gomock + +type FlagSyncServiceClient interface { + syncv1grpc.FlagSyncServiceClient +} +type FlagSyncServiceClientResponse interface { + syncv1grpc.FlagSyncService_SyncFlagsClient +} + +var once msync.Once + +type Sync struct { + GrpcDialOptionsOverride []grpc.DialOption + CertPath string + CredentialBuilder grpccredential.Builder + Logger *logger.Logger + ProviderID string + Secure bool + Selector string + URI string + MaxMsgSize int + + client FlagSyncServiceClient + connection *grpc.ClientConn + ready bool + events chan SyncEvent +} + +func (g *Sync) Init(ctx context.Context) error { + var rpcCon *grpc.ClientConn + var err error + + g.events = make(chan SyncEvent) + + if len(g.GrpcDialOptionsOverride) > 0 { + g.Logger.Debug("GRPC DialOptions override provided") + rpcCon, err = grpc.NewClient(g.URI, g.GrpcDialOptionsOverride...) + } else { + // Build dial options with enhanced features + var dialOptions []grpc.DialOption + + // Transport credentials + tCredentials, err := g.CredentialBuilder.Build(g.Secure, g.CertPath) + if err != nil { + err = fmt.Errorf("error building transport credentials: %w", err) + g.Logger.Error(err.Error()) + return err + } + dialOptions = append(dialOptions, grpc.WithTransportCredentials(tCredentials)) + + // Call options + var callOptions []grpc.CallOption + if g.MaxMsgSize > 0 { + callOptions = append(callOptions, grpc.MaxCallRecvMsgSize(g.MaxMsgSize)) + g.Logger.Info(fmt.Sprintf("setting max receive message size %d bytes", g.MaxMsgSize)) + } + if len(callOptions) > 0 { + dialOptions = append(dialOptions, grpc.WithDefaultCallOptions(callOptions...)) + } + + // Keepalive settings + keepaliveParams := keepalive.ClientParameters{ + Time: 30 * time.Second, // Send ping every 30 seconds + Timeout: 5 * time.Second, // Wait 5 seconds for ping response + PermitWithoutStream: true, // Allow pings when no streams active + } + dialOptions = append(dialOptions, grpc.WithKeepaliveParams(keepaliveParams)) + + // Create connection + rpcCon, err = grpc.NewClient(g.URI, dialOptions...) + } + + if err != nil { + err := fmt.Errorf("error initiating grpc client connection: %w", err) + g.Logger.Error(err.Error()) + return err + } + + // Store connection for state tracking + g.connection = rpcCon + + // Setup service client + g.client = syncv1grpc.NewFlagSyncServiceClient(rpcCon) + + // Start connection state monitoring in background + go g.monitorConnectionState(ctx) + + g.Logger.Info(fmt.Sprintf("gRPC client initialized for %s", g.URI)) + return nil +} + +func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { + res, err := g.client.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{ProviderId: g.ProviderID, Selector: g.Selector}) + if err != nil { + err = fmt.Errorf("error fetching all flags: %w", err) + g.Logger.Error(err.Error()) + return err + } + dataSync <- sync.DataSync{ + FlagData: res.GetFlagConfiguration(), + Source: g.URI, + } + return nil +} + +func (g *Sync) IsReady() bool { + return g.ready +} + +func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { + for { + g.Logger.Debug("creating sync stream...") + + // Create sync stream with wait-for-ready - let gRPC handle the connection waiting + syncClient, err := g.client.SyncFlags( + ctx, + &v1.SyncFlagsRequest{ + ProviderId: g.ProviderID, + Selector: g.Selector, + }, + grpc.WaitForReady(true), // gRPC will wait for connection to be ready + ) + if err != nil { + // Check if context is cancelled + if ctx.Err() != nil { + return ctx.Err() + } + + g.Logger.Warn(fmt.Sprintf("failed to create sync stream: %v", err)) + + // Brief pause before retry + select { + case <-time.After(time.Second): + continue + case <-ctx.Done(): + return ctx.Err() + } + } + + g.Logger.Info("sync stream established, starting to receive flags...") + + // Handle the stream - when it breaks, we'll create a new one + err = g.handleFlagSync(syncClient, dataSync) + if err != nil { + if ctx.Err() != nil { + return ctx.Err() + } + + g.Logger.Warn(fmt.Sprintf("stream closed: %v", err)) + // Loop will automatically create a new stream with wait-for-ready + } + } +} + +// monitorConnectionState monitors connection state changes and logs errors +func (g *Sync) monitorConnectionState(ctx context.Context) { + if g.connection == nil { + return + } + + currentState := g.connection.GetState() + g.Logger.Debug(fmt.Sprintf("starting connection state monitoring, initial state: %s", currentState)) + + for { + // Wait for next state change + if !g.connection.WaitForStateChange(ctx, currentState) { + // Context cancelled, exit monitoring + g.Logger.Debug("connection state monitoring stopped") + return + } + + newState := g.connection.GetState() + g.Logger.Debug(fmt.Sprintf("connection state changed: %s -> %s", currentState, newState)) + + // Log error states + switch newState { + case connectivity.TransientFailure: + g.events <- SyncEvent{event: of.ProviderError} + g.Logger.Error(fmt.Sprintf("gRPC connection entered TransientFailure state for %s", g.URI)) + case connectivity.Shutdown: + g.Logger.Error(fmt.Sprintf("gRPC connection shutdown for %s", g.URI)) + //return // Exit monitoring on shutdown + case connectivity.Ready: + g.Logger.Info(fmt.Sprintf("gRPC connection ready for %s", g.URI)) + case connectivity.Idle: + g.Logger.Debug(fmt.Sprintf("gRPC connection idle for %s", g.URI)) + case connectivity.Connecting: + g.Logger.Debug(fmt.Sprintf("gRPC connection attempting to connect to %s", g.URI)) + } + + currentState = newState + } +} + +// handleFlagSync wraps the stream listening and push updates through dataSync channel +func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { + once.Do(func() { + g.ready = true + }) + + // Stream message handling loop - receives each individual message from the stream + for { + data, err := stream.Recv() + if err != nil { + return fmt.Errorf("error receiving payload from stream: %w", err) + } + + dataSync <- sync.DataSync{ + FlagData: data.FlagConfiguration, + SyncContext: data.SyncContext, + Source: g.URI, + Selector: g.Selector, + } + + g.Logger.Debug("received full configuration payload") + } +} + +func (g *Sync) Events() chan SyncEvent { + return g.events +} diff --git a/providers/flagd/pkg/service/in_process/service.go b/providers/flagd/pkg/service/in_process/service.go index 468b1c64b..de148a719 100644 --- a/providers/flagd/pkg/service/in_process/service.go +++ b/providers/flagd/pkg/service/in_process/service.go @@ -3,6 +3,7 @@ package process import ( "context" "fmt" + "time" "regexp" parallel "sync" @@ -33,6 +34,8 @@ type InProcess struct { sync sync.ISync syncEnd context.CancelFunc wg parallel.WaitGroup + sendReady parallel.Once + configuration Configuration } type Configuration struct { @@ -47,8 +50,17 @@ type Configuration struct { CustomSyncProviderUri string GrpcDialOptionsOverride []googlegrpc.DialOption CertificatePath string + GracePeriod time.Duration } +type EventSync interface { + sync.ISync + Events() chan SyncEvent +} + +type SyncEvent struct { + event of.EventType +} type Shutdowner interface { Shutdown() error } @@ -76,6 +88,7 @@ func NewInProcessService(cfg Configuration) *InProcess { listenerShutdown: make(chan interface{}), serviceMetadata: svcMetadata, sync: iSync, + configuration: cfg, } } @@ -88,7 +101,54 @@ func (i *InProcess) Init() error { return err } - initOnce := parallel.Once{} + if eventSync, ok := i.sync.(EventSync); ok { + go func() { + var staleTimer *time.Timer + var staleTimerMu parallel.Mutex + + for { + select { + case <-ctx.Done(): + return + case msg := <-eventSync.Events(): + switch msg.event { + case of.ProviderError: + i.events <- of.Event{ + ProviderName: "flagd", + EventType: of.ProviderStale, + ProviderEventDetails: of.ProviderEventDetails{Message: "connection error"}, + } + i.sendReady = parallel.Once{} + + // Start stale timer (cancel existing one if running) + staleTimerMu.Lock() + if staleTimer != nil { + staleTimer.Stop() + } + staleTimer = time.AfterFunc(i.configuration.GracePeriod, func() { // n seconds + i.events <- of.Event{ + ProviderName: "flagd", + EventType: of.ProviderError, + ProviderEventDetails: of.ProviderEventDetails{Message: "provider error"}, + } + }) + staleTimerMu.Unlock() + + case of.ProviderReady: + // Cancel stale timer if running + staleTimerMu.Lock() + if staleTimer != nil { + staleTimer.Stop() + staleTimer = nil + } + staleTimerMu.Unlock() + } + } + } + }() + } + + i.sendReady = parallel.Once{} syncInitSuccess := make(chan interface{}) syncInitErr := make(chan error) @@ -118,7 +178,7 @@ func (i *InProcess) Init() error { ProviderName: "flagd", EventType: of.ProviderError, ProviderEventDetails: of.ProviderEventDetails{Message: "Error from flag sync " + err.Error()}} } - initOnce.Do(func() { + i.sendReady.Do(func() { i.events <- of.Event{ProviderName: "flagd", EventType: of.ProviderReady} syncInitSuccess <- nil }) @@ -320,7 +380,7 @@ func makeSyncProvider(cfg Configuration, log *logger.Logger) (sync.ISync, string log.Info("operating in in-process mode with flags sourced from " + uri) - return &grpc.Sync{ + return &Sync{ CredentialBuilder: &credentials.CredentialBuilder{}, GrpcDialOptionsOverride: cfg.GrpcDialOptionsOverride, Logger: log, diff --git a/tests/flagd/testframework/event_steps.go b/tests/flagd/testframework/event_steps.go index c428192d0..71ce012fc 100644 --- a/tests/flagd/testframework/event_steps.go +++ b/tests/flagd/testframework/event_steps.go @@ -183,7 +183,7 @@ func (s *TestState) waitForEvents(eventType string, maxWait time.Duration) error } } -// assertEventOccurred checks if a specific event occurred (with immediate timeout) +// assertEventOccurred checks if a specific event occurred func (s *TestState) assertEventOccurred(eventType string) error { return s.waitForEvents(eventType, 10*time.Second) } diff --git a/tests/flagd/testframework/provider_steps.go b/tests/flagd/testframework/provider_steps.go index 2a500b204..0c90af1a0 100644 --- a/tests/flagd/testframework/provider_steps.go +++ b/tests/flagd/testframework/provider_steps.go @@ -82,13 +82,13 @@ func (s *TestState) createProviderInstance() error { } // waitForProviderReady waits for the provider to be in READY state -func (s *TestState) waitForProviderReady(timeout time.Duration) error { +func (s *TestState) waitForProviderReady(ctx context.Context, timeout time.Duration) error { if s.Client == nil { return fmt.Errorf("no client available to wait for provider ready") } // Use generic event handler infrastructure - if err := s.addGenericEventHandler(context.Background(), "ready"); err != nil { + if err := s.addGenericEventHandler(ctx, "ready"); err != nil { return fmt.Errorf("failed to add ready event handler: %w", err) } @@ -130,7 +130,7 @@ func (s *TestState) createSpecializedFlagdProvider(ctx context.Context, provider } // Wait for provider to be ready - return s.waitForProviderReady(15 * time.Second) + return s.waitForProviderReady(ctx, 15*time.Second) } return nil } From 5155a86171f84a89c3c8096a81473f86d8f45045 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Sun, 7 Sep 2025 20:46:42 +0200 Subject: [PATCH 3/9] fixup: improve configuration Signed-off-by: Simon Schrottner --- providers/flagd/pkg/configuration.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/providers/flagd/pkg/configuration.go b/providers/flagd/pkg/configuration.go index 60bfde622..3cf7203f0 100644 --- a/providers/flagd/pkg/configuration.go +++ b/providers/flagd/pkg/configuration.go @@ -233,7 +233,7 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() { cfg.GracePeriod = time.Duration(seconds) * time.Second } else { // Handle parsing error - cfg.log.Error(err, fmt.Sprintf("invalid grace period '%s': %v", gracePeriod, err)) + cfg.log.Error(err, fmt.Sprintf("invalid grace period '%s'", gracePeriod)) } } @@ -409,3 +409,10 @@ func WithGrpcDialOptionsOverride(grpcDialOptionsOverride []grpc.DialOption) Prov p.GrpcDialOptionsOverride = grpcDialOptionsOverride } } + +// WithGracePeriod allows to set a time window for the transition from stale to error state +func WithGracePeriod(gracePeriod time.Duration) ProviderOption { + return func(p *ProviderConfiguration) { + p.GracePeriod = gracePeriod + } +} From 56aa6f468b1a5c578b20c3f991100ef1d2d6902d Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Mon, 8 Sep 2025 21:18:34 +0200 Subject: [PATCH 4/9] fixup: improve code Signed-off-by: Simon Schrottner --- providers/flagd/go.mod | 1 - providers/flagd/pkg/configuration.go | 17 +- providers/flagd/pkg/provider.go | 1 + .../flagd/pkg/service/in_process/grpc_sync.go | 403 +++++++++---- .../flagd/pkg/service/in_process/service.go | 538 ++++++++++++------ tests/flagd/testframework/config_steps.go | 1 - tests/flagd/testframework/provider_steps.go | 5 + tests/flagd/testframework/step_definitions.go | 12 +- 8 files changed, 666 insertions(+), 312 deletions(-) diff --git a/providers/flagd/go.mod b/providers/flagd/go.mod index f633674a3..79eff963e 100644 --- a/providers/flagd/go.mod +++ b/providers/flagd/go.mod @@ -15,7 +15,6 @@ require ( github.com/open-feature/flagd/core v0.12.1 github.com/open-feature/go-sdk v1.15.1 github.com/open-feature/go-sdk-contrib/tests/flagd v0.0.0 - github.com/testcontainers/testcontainers-go v0.32.0 go.uber.org/mock v0.5.2 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b diff --git a/providers/flagd/pkg/configuration.go b/providers/flagd/pkg/configuration.go index 3cf7203f0..859eb80e4 100644 --- a/providers/flagd/pkg/configuration.go +++ b/providers/flagd/pkg/configuration.go @@ -11,7 +11,6 @@ import ( "os" "strconv" "strings" - "time" ) type ResolverType string @@ -26,7 +25,7 @@ const ( defaultCache = cache.LRUValue defaultHost = "localhost" defaultResolver = rpc - defaultGracePeriod = 5 * time.Second + defaultGracePeriod = 5 rpc ResolverType = "rpc" inProcess ResolverType = "in-process" @@ -45,7 +44,7 @@ const ( flagdSourceSelectorEnvironmentVariableName = "FLAGD_SOURCE_SELECTOR" flagdOfflinePathEnvironmentVariableName = "FLAGD_OFFLINE_FLAG_SOURCE_PATH" flagdTargetUriEnvironmentVariableName = "FLAGD_TARGET_URI" - flagdGracePeriodVariableName = "FLAGD_GRACE_PERIOD" + flagdGracePeriodVariableName = "FLAGD_RETRY_GRACE_PERIOD" ) type ProviderConfiguration struct { @@ -66,7 +65,7 @@ type ProviderConfiguration struct { CustomSyncProvider sync.ISync CustomSyncProviderUri string GrpcDialOptionsOverride []grpc.DialOption - GracePeriod time.Duration + RetryGracePeriod int log logr.Logger } @@ -80,7 +79,7 @@ func newDefaultConfiguration(log logr.Logger) *ProviderConfiguration { MaxCacheSize: defaultMaxCacheSize, Resolver: defaultResolver, Tls: defaultTLS, - GracePeriod: defaultGracePeriod, + RetryGracePeriod: defaultGracePeriod, } p.updateFromEnvVar() @@ -230,7 +229,7 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() { } if gracePeriod := os.Getenv(flagdGracePeriodVariableName); gracePeriod != "" { if seconds, err := strconv.Atoi(gracePeriod); err == nil { - cfg.GracePeriod = time.Duration(seconds) * time.Second + cfg.RetryGracePeriod = seconds } else { // Handle parsing error cfg.log.Error(err, fmt.Sprintf("invalid grace period '%s'", gracePeriod)) @@ -410,9 +409,9 @@ func WithGrpcDialOptionsOverride(grpcDialOptionsOverride []grpc.DialOption) Prov } } -// WithGracePeriod allows to set a time window for the transition from stale to error state -func WithGracePeriod(gracePeriod time.Duration) ProviderOption { +// WithRetryGracePeriod allows to set a time window for the transition from stale to error state +func WithRetryGracePeriod(gracePeriod int) ProviderOption { return func(p *ProviderConfiguration) { - p.GracePeriod = gracePeriod + p.RetryGracePeriod = gracePeriod } } diff --git a/providers/flagd/pkg/provider.go b/providers/flagd/pkg/provider.go index 2b0084c32..93917b604 100644 --- a/providers/flagd/pkg/provider.go +++ b/providers/flagd/pkg/provider.go @@ -72,6 +72,7 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) { CustomSyncProvider: provider.providerConfiguration.CustomSyncProvider, CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri, GrpcDialOptionsOverride: provider.providerConfiguration.GrpcDialOptionsOverride, + RetryGracePeriod: provider.providerConfiguration.RetryGracePeriod, }) } else { service = process.NewInProcessService(process.Configuration{ diff --git a/providers/flagd/pkg/service/in_process/grpc_sync.go b/providers/flagd/pkg/service/in_process/grpc_sync.go index a9ccaf6e1..8b40f97ab 100644 --- a/providers/flagd/pkg/service/in_process/grpc_sync.go +++ b/providers/flagd/pkg/service/in_process/grpc_sync.go @@ -22,20 +22,27 @@ const ( Prefix = "grpc://" PrefixSecure = "grpcs://" SupportedScheme = "(envoy|dns|uds|xds)" -) -// type aliases for interfaces required by this component - needed for mock generation with gomock + // Default timeouts and retry intervals + defaultRetryDelay = 1 * time.Second + defaultKeepaliveTime = 30 * time.Second + defaultKeepaliveTimeout = 5 * time.Second +) +// Type aliases for interfaces required by this component - needed for mock generation with gomock type FlagSyncServiceClient interface { syncv1grpc.FlagSyncServiceClient } + type FlagSyncServiceClientResponse interface { syncv1grpc.FlagSyncService_SyncFlagsClient } var once msync.Once +// Sync implements gRPC-based flag synchronization with improved context cancellation and error handling type Sync struct { + // Configuration GrpcDialOptionsOverride []grpc.DialOption CertPath string CredentialBuilder grpccredential.Builder @@ -46,141 +53,253 @@ type Sync struct { URI string MaxMsgSize int - client FlagSyncServiceClient - connection *grpc.ClientConn - ready bool - events chan SyncEvent + // Runtime state + client FlagSyncServiceClient + connection *grpc.ClientConn + ready bool + events chan SyncEvent + shutdownComplete chan struct{} + shutdownOnce msync.Once } +// Init initializes the gRPC connection and starts background monitoring func (g *Sync) Init(ctx context.Context) error { - var rpcCon *grpc.ClientConn - var err error + g.Logger.Info(fmt.Sprintf("initializing gRPC client for %s", g.URI)) - g.events = make(chan SyncEvent) + // Initialize channels + g.shutdownComplete = make(chan struct{}) + g.events = make(chan SyncEvent, 10) // Buffered to prevent blocking - if len(g.GrpcDialOptionsOverride) > 0 { - g.Logger.Debug("GRPC DialOptions override provided") - rpcCon, err = grpc.NewClient(g.URI, g.GrpcDialOptionsOverride...) - } else { - // Build dial options with enhanced features - var dialOptions []grpc.DialOption - - // Transport credentials - tCredentials, err := g.CredentialBuilder.Build(g.Secure, g.CertPath) - if err != nil { - err = fmt.Errorf("error building transport credentials: %w", err) - g.Logger.Error(err.Error()) - return err - } - dialOptions = append(dialOptions, grpc.WithTransportCredentials(tCredentials)) + // Establish gRPC connection + conn, err := g.createConnection() + if err != nil { + return fmt.Errorf("failed to create gRPC connection: %w", err) + } - // Call options - var callOptions []grpc.CallOption - if g.MaxMsgSize > 0 { - callOptions = append(callOptions, grpc.MaxCallRecvMsgSize(g.MaxMsgSize)) - g.Logger.Info(fmt.Sprintf("setting max receive message size %d bytes", g.MaxMsgSize)) - } - if len(callOptions) > 0 { - dialOptions = append(dialOptions, grpc.WithDefaultCallOptions(callOptions...)) - } + g.connection = conn + g.client = syncv1grpc.NewFlagSyncServiceClient(conn) - // Keepalive settings - keepaliveParams := keepalive.ClientParameters{ - Time: 30 * time.Second, // Send ping every 30 seconds - Timeout: 5 * time.Second, // Wait 5 seconds for ping response - PermitWithoutStream: true, // Allow pings when no streams active - } - dialOptions = append(dialOptions, grpc.WithKeepaliveParams(keepaliveParams)) + // Start connection state monitoring in background + go g.monitorConnectionState(ctx) + + g.Logger.Info(fmt.Sprintf("gRPC client initialized successfully for %s", g.URI)) + return nil +} - // Create connection - rpcCon, err = grpc.NewClient(g.URI, dialOptions...) +// createConnection creates and configures the gRPC connection +func (g *Sync) createConnection() (*grpc.ClientConn, error) { + if len(g.GrpcDialOptionsOverride) > 0 { + g.Logger.Debug("using provided gRPC DialOptions override") + return grpc.NewClient(g.URI, g.GrpcDialOptionsOverride...) } + // Build standard dial options + dialOptions, err := g.buildDialOptions() if err != nil { - err := fmt.Errorf("error initiating grpc client connection: %w", err) - g.Logger.Error(err.Error()) - return err + return nil, fmt.Errorf("failed to build dial options: %w", err) } - // Store connection for state tracking - g.connection = rpcCon + return grpc.NewClient(g.URI, dialOptions...) +} - // Setup service client - g.client = syncv1grpc.NewFlagSyncServiceClient(rpcCon) +// buildDialOptions constructs the standard gRPC dial options +func (g *Sync) buildDialOptions() ([]grpc.DialOption, error) { + var dialOptions []grpc.DialOption - // Start connection state monitoring in background - go g.monitorConnectionState(ctx) + // Transport credentials + tCredentials, err := g.CredentialBuilder.Build(g.Secure, g.CertPath) + if err != nil { + return nil, fmt.Errorf("failed to build transport credentials: %w", err) + } + dialOptions = append(dialOptions, grpc.WithTransportCredentials(tCredentials)) - g.Logger.Info(fmt.Sprintf("gRPC client initialized for %s", g.URI)) - return nil + // Call options for message size + if g.MaxMsgSize > 0 { + callOptions := []grpc.CallOption{grpc.MaxCallRecvMsgSize(g.MaxMsgSize)} + dialOptions = append(dialOptions, grpc.WithDefaultCallOptions(callOptions...)) + g.Logger.Info(fmt.Sprintf("setting max receive message size to %d bytes", g.MaxMsgSize)) + } + + // Keepalive settings for connection health + keepaliveParams := keepalive.ClientParameters{ + Time: defaultKeepaliveTime, + Timeout: defaultKeepaliveTimeout, + PermitWithoutStream: true, + } + dialOptions = append(dialOptions, grpc.WithKeepaliveParams(keepaliveParams)) + + return dialOptions, nil } +// ReSync performs a one-time fetch of all flags func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { - res, err := g.client.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{ProviderId: g.ProviderID, Selector: g.Selector}) + g.Logger.Debug("performing ReSync - fetching all flags") + + res, err := g.client.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{ + ProviderId: g.ProviderID, + Selector: g.Selector, + }) if err != nil { - err = fmt.Errorf("error fetching all flags: %w", err) - g.Logger.Error(err.Error()) - return err + return fmt.Errorf("failed to fetch all flags: %w", err) } - dataSync <- sync.DataSync{ + + select { + case dataSync <- sync.DataSync{ FlagData: res.GetFlagConfiguration(), Source: g.URI, + }: + g.Logger.Debug("ReSync completed successfully") + return nil + case <-ctx.Done(): + return ctx.Err() } - return nil } +// IsReady returns whether the sync is ready to serve requests func (g *Sync) IsReady() bool { return g.ready } +// Sync starts the continuous flag synchronization process with improved context handling func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { + g.Logger.Info("starting continuous flag synchronization") + + // Ensure shutdown completion is signaled when THIS method exits + defer g.markShutdownComplete() + for { - g.Logger.Debug("creating sync stream...") - - // Create sync stream with wait-for-ready - let gRPC handle the connection waiting - syncClient, err := g.client.SyncFlags( - ctx, - &v1.SyncFlagsRequest{ - ProviderId: g.ProviderID, - Selector: g.Selector, - }, - grpc.WaitForReady(true), // gRPC will wait for connection to be ready - ) - if err != nil { - // Check if context is cancelled + // Check for cancellation before each iteration + select { + case <-ctx.Done(): + g.Logger.Info("sync stopped due to context cancellation") + return ctx.Err() + default: + // Continue with sync logic + } + + // Attempt to create sync stream + if err := g.performSyncCycle(ctx, dataSync); err != nil { if ctx.Err() != nil { + g.Logger.Info("sync cycle failed due to context cancellation") return ctx.Err() } - g.Logger.Warn(fmt.Sprintf("failed to create sync stream: %v", err)) + g.Logger.Warn(fmt.Sprintf("sync cycle failed: %v, retrying...", err)) - // Brief pause before retry + // Wait before retry with cancellation support select { - case <-time.After(time.Second): + case <-time.After(defaultRetryDelay): continue case <-ctx.Done(): + g.Logger.Info("sync stopped during retry delay due to context cancellation") return ctx.Err() } } + } +} + +// performSyncCycle handles a single sync cycle (create stream, handle messages, cleanup) +func (g *Sync) performSyncCycle(ctx context.Context, dataSync chan<- sync.DataSync) error { + g.Logger.Debug("creating new sync stream") + + // Create sync stream with wait-for-ready to handle connection issues gracefully + stream, err := g.client.SyncFlags( + ctx, + &v1.SyncFlagsRequest{ + ProviderId: g.ProviderID, + Selector: g.Selector, + }, + grpc.WaitForReady(true), + ) + if err != nil { + return fmt.Errorf("failed to create sync stream: %w", err) + } - g.Logger.Info("sync stream established, starting to receive flags...") + g.Logger.Info("sync stream established, starting to receive flags") - // Handle the stream - when it breaks, we'll create a new one - err = g.handleFlagSync(syncClient, dataSync) - if err != nil { - if ctx.Err() != nil { - return ctx.Err() + // Handle the stream with proper context cancellation + return g.handleFlagSync(ctx, stream, dataSync) +} + +// handleFlagSync processes messages from the sync stream with proper context handling +func (g *Sync) handleFlagSync(ctx context.Context, stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { + // Mark as ready on first successful stream + once.Do(func() { + g.ready = true + g.Logger.Info("sync service is now ready") + }) + + // Create channels for stream communication + streamChan := make(chan *v1.SyncFlagsResponse, 1) + errChan := make(chan error, 1) + + // Start goroutine to receive from stream + go func() { + defer close(streamChan) + defer close(errChan) + + for { + data, err := stream.Recv() + if err != nil { + select { + case errChan <- err: + case <-ctx.Done(): + } + return + } + + select { + case streamChan <- data: + case <-ctx.Done(): + return + } + } + }() + + // Main message handling loop with proper cancellation support + for { + select { + case data, ok := <-streamChan: + if !ok { + return fmt.Errorf("stream channel closed") } - g.Logger.Warn(fmt.Sprintf("stream closed: %v", err)) - // Loop will automatically create a new stream with wait-for-ready + if err := g.processFlagData(ctx, data, dataSync); err != nil { + return err + } + + case err := <-errChan: + return fmt.Errorf("stream error: %w", err) + + case <-ctx.Done(): + g.Logger.Info("handleFlagSync stopped due to context cancellation") + return ctx.Err() } } } -// monitorConnectionState monitors connection state changes and logs errors +// processFlagData handles individual flag configuration updates +func (g *Sync) processFlagData(ctx context.Context, data *v1.SyncFlagsResponse, dataSync chan<- sync.DataSync) error { + syncData := sync.DataSync{ + FlagData: data.FlagConfiguration, + SyncContext: data.SyncContext, + Source: g.URI, + Selector: g.Selector, + } + + select { + case dataSync <- syncData: + g.Logger.Debug("successfully processed flag configuration update") + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// monitorConnectionState monitors gRPC connection state changes with improved cancellation handling func (g *Sync) monitorConnectionState(ctx context.Context) { if g.connection == nil { + g.Logger.Warn("no connection available for state monitoring") return } @@ -188,60 +307,102 @@ func (g *Sync) monitorConnectionState(ctx context.Context) { g.Logger.Debug(fmt.Sprintf("starting connection state monitoring, initial state: %s", currentState)) for { - // Wait for next state change + // Wait for state change with context support if !g.connection.WaitForStateChange(ctx, currentState) { - // Context cancelled, exit monitoring - g.Logger.Debug("connection state monitoring stopped") + g.Logger.Debug("connection state monitoring stopped due to context cancellation") + return + } + + // Check for cancellation + select { + case <-ctx.Done(): + g.Logger.Debug("connection state monitoring stopped due to context cancellation") return + default: } newState := g.connection.GetState() g.Logger.Debug(fmt.Sprintf("connection state changed: %s -> %s", currentState, newState)) - // Log error states - switch newState { - case connectivity.TransientFailure: - g.events <- SyncEvent{event: of.ProviderError} - g.Logger.Error(fmt.Sprintf("gRPC connection entered TransientFailure state for %s", g.URI)) - case connectivity.Shutdown: - g.Logger.Error(fmt.Sprintf("gRPC connection shutdown for %s", g.URI)) - //return // Exit monitoring on shutdown - case connectivity.Ready: - g.Logger.Info(fmt.Sprintf("gRPC connection ready for %s", g.URI)) - case connectivity.Idle: - g.Logger.Debug(fmt.Sprintf("gRPC connection idle for %s", g.URI)) - case connectivity.Connecting: - g.Logger.Debug(fmt.Sprintf("gRPC connection attempting to connect to %s", g.URI)) - } - + // Handle state-specific logic + g.handleConnectionState(ctx, newState) currentState = newState } } -// handleFlagSync wraps the stream listening and push updates through dataSync channel -func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { - once.Do(func() { - g.ready = true - }) +// handleConnectionState processes specific connection state changes +func (g *Sync) handleConnectionState(ctx context.Context, state connectivity.State) { + switch state { + case connectivity.TransientFailure: + g.Logger.Error(fmt.Sprintf("gRPC connection entered TransientFailure state for %s", g.URI)) + g.sendEvent(ctx, SyncEvent{event: of.ProviderError}) - // Stream message handling loop - receives each individual message from the stream - for { - data, err := stream.Recv() - if err != nil { - return fmt.Errorf("error receiving payload from stream: %w", err) - } + case connectivity.Shutdown: + g.Logger.Error(fmt.Sprintf("gRPC connection shutdown for %s", g.URI)) - dataSync <- sync.DataSync{ - FlagData: data.FlagConfiguration, - SyncContext: data.SyncContext, - Source: g.URI, - Selector: g.Selector, - } + case connectivity.Ready: + g.Logger.Info(fmt.Sprintf("gRPC connection ready for %s", g.URI)) + g.sendEvent(ctx, SyncEvent{event: of.ProviderReady}) + + case connectivity.Idle: + g.Logger.Debug(fmt.Sprintf("gRPC connection idle for %s", g.URI)) + + case connectivity.Connecting: + g.Logger.Debug(fmt.Sprintf("gRPC connection attempting to connect to %s", g.URI)) + } +} - g.Logger.Debug("received full configuration payload") +// sendEvent safely sends events with cancellation support +func (g *Sync) sendEvent(ctx context.Context, event SyncEvent) { + select { + case g.events <- event: + // Event sent successfully + case <-ctx.Done(): + // Context cancelled, don't block + default: + // Channel full, log warning but don't block + g.Logger.Warn("event channel full, dropping event") } } +// markShutdownComplete signals that shutdown has completed +func (g *Sync) markShutdownComplete() { + g.shutdownOnce.Do(func() { + close(g.shutdownComplete) + g.Logger.Debug("shutdown completion signaled") + }) +} + +// Events returns the channel for sync events func (g *Sync) Events() chan SyncEvent { return g.events } + +// Shutdown gracefully shuts down the sync service +func (g *Sync) Shutdown() error { + g.Logger.Info("shutting down gRPC sync service") + + // Wait for shutdown completion with timeout + select { + case <-g.shutdownComplete: + g.Logger.Info("sync operations completed gracefully") + case <-time.After(5 * time.Second): + g.Logger.Warn("shutdown timeout exceeded - forcing close") + } + + // Close events channel + if g.events != nil { + close(g.events) + } + + // Close gRPC connection + if g.connection != nil { + if err := g.connection.Close(); err != nil { + g.Logger.Error(fmt.Sprintf("error closing gRPC connection: %v", err)) + return err + } + } + + g.Logger.Info("gRPC sync service shutdown completed successfully") + return nil +} diff --git a/providers/flagd/pkg/service/in_process/service.go b/providers/flagd/pkg/service/in_process/service.go index de148a719..ae27899db 100644 --- a/providers/flagd/pkg/service/in_process/service.go +++ b/providers/flagd/pkg/service/in_process/service.go @@ -3,10 +3,9 @@ package process import ( "context" "fmt" - "time" - "regexp" - parallel "sync" + "sync" + "time" "go.uber.org/zap" googlegrpc "google.golang.org/grpc" @@ -15,7 +14,7 @@ import ( "github.com/open-feature/flagd/core/pkg/logger" "github.com/open-feature/flagd/core/pkg/model" "github.com/open-feature/flagd/core/pkg/store" - "github.com/open-feature/flagd/core/pkg/sync" + isync "github.com/open-feature/flagd/core/pkg/sync" "github.com/open-feature/flagd/core/pkg/sync/file" "github.com/open-feature/flagd/core/pkg/sync/grpc" "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials" @@ -23,21 +22,84 @@ import ( "golang.org/x/exp/maps" ) +const ( + // Channel buffer sizes + eventChannelBuffer = 5 + syncChannelBuffer = 1 + + // Provider name for events + providerName = "flagd" +) + // InProcess service implements flagd flag evaluation in-process. // Flag configurations are obtained from supported sources. type InProcess struct { - evaluator evaluator.IEvaluator - events chan of.Event - listenerShutdown chan interface{} - logger *logger.Logger - serviceMetadata model.Metadata - sync sync.ISync - syncEnd context.CancelFunc - wg parallel.WaitGroup - sendReady parallel.Once - configuration Configuration + // Core components + evaluator evaluator.IEvaluator + syncProvider isync.ISync + logger *logger.Logger + configuration Configuration + serviceMetadata model.Metadata + + // Event handling + events chan of.Event + eventSync EventSync + + // Shutdown coordination + ctx context.Context + cancelFunc context.CancelFunc + shutdownChannels *shutdownChannels + wg sync.WaitGroup + shutdownOnce sync.Once + + // Stateless coordination using sync.Once + initOnce sync.Once + sendReadyOnNextData sync.Once + staleTimer *staleTimer +} + +// shutdownChannels groups all shutdown-related channels +type shutdownChannels struct { + listenerShutdown chan struct{} + syncData chan isync.DataSync + initSuccess chan struct{} + initError chan error +} + +// staleTimer manages the stale connection timer with thread safety +type staleTimer struct { + timer *time.Timer + mu sync.Mutex +} + +// newStaleTimer creates a new thread-safe stale timer +func newStaleTimer() *staleTimer { + return &staleTimer{} +} + +// start starts or restarts the stale timer +func (st *staleTimer) start(duration time.Duration, callback func()) { + st.mu.Lock() + defer st.mu.Unlock() + + if st.timer != nil { + st.timer.Stop() + } + st.timer = time.AfterFunc(duration, callback) +} + +// stop stops the stale timer +func (st *staleTimer) stop() { + st.mu.Lock() + defer st.mu.Unlock() + + if st.timer != nil { + st.timer.Stop() + st.timer = nil + } } +// Configuration holds all configuration for the InProcess service type Configuration struct { Host any Port any @@ -46,177 +108,309 @@ type Configuration struct { Selector string TLSEnabled bool OfflineFlagSource string - CustomSyncProvider sync.ISync + CustomSyncProvider isync.ISync CustomSyncProviderUri string GrpcDialOptionsOverride []googlegrpc.DialOption CertificatePath string - GracePeriod time.Duration + RetryGracePeriod int } +// EventSync interface for sync providers that support events type EventSync interface { - sync.ISync + isync.ISync Events() chan SyncEvent } +// SyncEvent represents an event from the sync provider type SyncEvent struct { event of.EventType } + +// Shutdowner interface for graceful shutdown type Shutdowner interface { Shutdown() error } +// NewInProcessService creates a new InProcess service with the given configuration func NewInProcessService(cfg Configuration) *InProcess { log := logger.NewLogger(NewRaw(), false) + syncProvider, uri := createSyncProvider(cfg, log) - iSync, uri := makeSyncProvider(cfg, log) + flagStore := store.NewFlags() + flagStore.FlagSources = append(flagStore.FlagSources, uri) - // service specific metadata - svcMetadata := make(model.Metadata, 2) + return &InProcess{ + evaluator: evaluator.NewJSON(log, flagStore), + syncProvider: syncProvider, + logger: log, + configuration: cfg, + serviceMetadata: createServiceMetadata(cfg), + events: make(chan of.Event, eventChannelBuffer), + staleTimer: newStaleTimer(), + sendReadyOnNextData: sync.Once{}, // Armed and ready to fire on first data + } +} + +// createServiceMetadata builds the service metadata from configuration +func createServiceMetadata(cfg Configuration) model.Metadata { + metadata := make(model.Metadata, 2) if cfg.Selector != "" { - svcMetadata["scope"] = cfg.Selector + metadata["scope"] = cfg.Selector } if cfg.ProviderID != "" { - svcMetadata["providerID"] = cfg.ProviderID + metadata["providerID"] = cfg.ProviderID } + return metadata +} - flagStore := store.NewFlags() - flagStore.FlagSources = append(flagStore.FlagSources, uri) - return &InProcess{ - evaluator: evaluator.NewJSON(log, flagStore), - events: make(chan of.Event, 5), - logger: log, - listenerShutdown: make(chan interface{}), - serviceMetadata: svcMetadata, - sync: iSync, - configuration: cfg, +// Init initializes the service and starts all background processes +func (i *InProcess) Init() error { + i.logger.Info("initializing InProcess service") + + // Setup context and shutdown channels + i.setupShutdownInfrastructure() + + // Initialize sync provider + if err := i.syncProvider.Init(i.ctx); err != nil { + return fmt.Errorf("failed to initialize sync provider: %w", err) } + + // Start background processes + i.startEventSyncMonitor() + i.startDataSyncProcess() + i.startDataSyncListener() + + // Wait for initialization to complete + return i.waitForInitialization() } -func (i *InProcess) Init() error { - var ctx context.Context - ctx, i.syncEnd = context.WithCancel(context.Background()) +// setupShutdownInfrastructure initializes context and channels for coordinated shutdown +func (i *InProcess) setupShutdownInfrastructure() { + i.ctx, i.cancelFunc = context.WithCancel(context.Background()) + i.shutdownChannels = &shutdownChannels{ + listenerShutdown: make(chan struct{}), + syncData: make(chan isync.DataSync, syncChannelBuffer), + initSuccess: make(chan struct{}), + initError: make(chan error, 1), + } +} - err := i.sync.Init(ctx) - if err != nil { - return err - } - - if eventSync, ok := i.sync.(EventSync); ok { - go func() { - var staleTimer *time.Timer - var staleTimerMu parallel.Mutex - - for { - select { - case <-ctx.Done(): - return - case msg := <-eventSync.Events(): - switch msg.event { - case of.ProviderError: - i.events <- of.Event{ - ProviderName: "flagd", - EventType: of.ProviderStale, - ProviderEventDetails: of.ProviderEventDetails{Message: "connection error"}, - } - i.sendReady = parallel.Once{} - - // Start stale timer (cancel existing one if running) - staleTimerMu.Lock() - if staleTimer != nil { - staleTimer.Stop() - } - staleTimer = time.AfterFunc(i.configuration.GracePeriod, func() { // n seconds - i.events <- of.Event{ - ProviderName: "flagd", - EventType: of.ProviderError, - ProviderEventDetails: of.ProviderEventDetails{Message: "provider error"}, - } - }) - staleTimerMu.Unlock() - - case of.ProviderReady: - // Cancel stale timer if running - staleTimerMu.Lock() - if staleTimer != nil { - staleTimer.Stop() - staleTimer = nil - } - staleTimerMu.Unlock() - } - } - } - }() - } - - i.sendReady = parallel.Once{} - syncInitSuccess := make(chan interface{}) - syncInitErr := make(chan error) - - syncChan := make(chan sync.DataSync, 1) - - // start data sync +// startEventSyncMonitor starts monitoring events from EventSync providers +func (i *InProcess) startEventSyncMonitor() { + eventSync, ok := i.syncProvider.(EventSync) + if !ok { + return // No event monitoring needed + } + + i.eventSync = eventSync + go i.runEventSyncMonitor() +} + +// runEventSyncMonitor handles events from the sync provider +func (i *InProcess) runEventSyncMonitor() { + i.logger.Debug("starting event sync monitor") + defer i.logger.Debug("event sync monitor stopped") + + for { + select { + case <-i.ctx.Done(): + return + case <-i.shutdownChannels.listenerShutdown: + return + case msg := <-i.eventSync.Events(): + i.handleSyncEvent(msg) + } + } +} + +// handleSyncEvent processes individual sync events +func (i *InProcess) handleSyncEvent(event SyncEvent) { + switch event.event { + case of.ProviderError: + i.handleProviderError() + // Reset the sync.Once so it can fire again on recovery + i.sendReadyOnNextData = sync.Once{} + case of.ProviderReady: + i.handleProviderReady() + } +} + +// handleProviderError handles provider error events by starting stale timer +func (i *InProcess) handleProviderError() { + i.events <- of.Event{ + ProviderName: providerName, + EventType: of.ProviderStale, + ProviderEventDetails: of.ProviderEventDetails{Message: "connection error"}, + } + + // Start stale timer - when it expires, send error event + i.staleTimer.start(time.Duration(i.configuration.RetryGracePeriod)*time.Second, func() { + i.events <- of.Event{ + ProviderName: providerName, + EventType: of.ProviderError, + ProviderEventDetails: of.ProviderEventDetails{Message: "provider error"}, + } + }) +} + +// handleProviderReady handles provider ready events by stopping stale timer +func (i *InProcess) handleProviderReady() { + i.staleTimer.stop() +} + +// startDataSyncProcess starts the main data synchronization goroutine +func (i *InProcess) startDataSyncProcess() { i.wg.Add(1) - go func() { - defer i.wg.Done() - err := i.sync.Sync(ctx, syncChan) - if err != nil { - syncInitErr <- err + go i.runDataSyncProcess() +} + +// runDataSyncProcess runs the main sync process and handles errors appropriately +func (i *InProcess) runDataSyncProcess() { + defer i.wg.Done() + i.logger.Debug("starting data sync process") + defer i.logger.Debug("data sync process stopped") + + err := i.syncProvider.Sync(i.ctx, i.shutdownChannels.syncData) + if err != nil && i.ctx.Err() == nil { + // Only report non-cancellation errors + select { + case i.shutdownChannels.initError <- err: + default: + // Don't block if channel is full or no reader } - }() + } +} - // start data sync listener and listen to listener shutdown hook +// startDataSyncListener starts the data sync listener goroutine +func (i *InProcess) startDataSyncListener() { i.wg.Add(1) - go func() { - defer i.wg.Done() - for { - select { - case data := <-syncChan: - // re-syncs are ignored as we only support single flag sync source - changes, _, err := i.evaluator.SetState(data) - if err != nil { - i.events <- of.Event{ - ProviderName: "flagd", EventType: of.ProviderError, - ProviderEventDetails: of.ProviderEventDetails{Message: "Error from flag sync " + err.Error()}} - } - i.sendReady.Do(func() { - i.events <- of.Event{ProviderName: "flagd", EventType: of.ProviderReady} - syncInitSuccess <- nil - }) - i.events <- of.Event{ - ProviderName: "flagd", EventType: of.ProviderConfigChange, - ProviderEventDetails: of.ProviderEventDetails{Message: "New flag sync", FlagChanges: maps.Keys(changes)}} - case <-i.listenerShutdown: - i.logger.Info("Shutting down data sync listener") - if shutdowner, ok := i.sync.(Shutdowner); ok { - err := shutdowner.Shutdown() - if err != nil { - i.logger.Error("Error shutdown sync provider", zap.Error(err)) - } - } - return - } + go i.runDataSyncListener() +} + +// runDataSyncListener processes incoming sync data and handles shutdown +func (i *InProcess) runDataSyncListener() { + defer i.wg.Done() + i.logger.Debug("starting data sync listener") + defer i.logger.Debug("data sync listener stopped") + + for { + select { + case data := <-i.shutdownChannels.syncData: + i.processSyncData(data) + + case <-i.ctx.Done(): + i.logger.Info("data sync listener stopping due to context cancellation") + i.shutdownSyncProvider() + return + + case <-i.shutdownChannels.listenerShutdown: + i.logger.Info("data sync listener stopping due to shutdown signal") + i.shutdownSyncProvider() + return + } + } +} + +// processSyncData handles individual sync data updates +func (i *InProcess) processSyncData(data isync.DataSync) { + changes, _, err := i.evaluator.SetState(data) + if err != nil { + i.events <- of.Event{ + ProviderName: providerName, + EventType: of.ProviderError, + ProviderEventDetails: of.ProviderEventDetails{Message: "Error from flag sync " + err.Error()}, + } + return + } + + i.logger.Info("staletimer stop") + // Stop stale timer - we've successfully received and processed data + i.staleTimer.stop() + + // Send ready event using sync.Once - handles initial ready and recovery automatically + i.sendReadyOnNextData.Do(func() { + i.events <- of.Event{ProviderName: providerName, EventType: of.ProviderReady} + }) + + // Handle initialization completion (only happens once ever) + i.initOnce.Do(func() { + close(i.shutdownChannels.initSuccess) + }) + + // Send config change event for data updates + if len(changes) > 0 { + i.events <- of.Event{ + ProviderName: providerName, + EventType: of.ProviderConfigChange, + ProviderEventDetails: of.ProviderEventDetails{ + Message: "New flag sync", + FlagChanges: maps.Keys(changes), + }, + } + } +} + +// shutdownSyncProvider gracefully shuts down the sync provider +func (i *InProcess) shutdownSyncProvider() { + if shutdowner, ok := i.syncProvider.(Shutdowner); ok { + if err := shutdowner.Shutdown(); err != nil { + i.logger.Error("error shutting down sync provider", zap.Error(err)) } - }() + } +} - // wait for initialization or error +// waitForInitialization waits for the service to initialize or fail +func (i *InProcess) waitForInitialization() error { select { - case <-syncInitSuccess: + case <-i.shutdownChannels.initSuccess: + i.logger.Info("InProcess service initialized successfully") return nil - case err := <-syncInitErr: - return err + case err := <-i.shutdownChannels.initError: + return fmt.Errorf("initialization failed: %w", err) } } +// Shutdown gracefully shuts down the service func (i *InProcess) Shutdown() { - i.syncEnd() - close(i.listenerShutdown) - i.wg.Wait() + i.shutdownOnce.Do(func() { + i.logger.Info("starting InProcess service shutdown") + + // Stop stale timer + i.staleTimer.stop() + + // Cancel context to signal all goroutines + if i.cancelFunc != nil { + i.cancelFunc() + } + + // Close shutdown channels + if i.shutdownChannels != nil { + close(i.shutdownChannels.listenerShutdown) + } + + i.logger.Info("waiting for background processes to complete") + i.wg.Wait() + i.logger.Info("InProcess service shutdown completed successfully") + }) +} + +// EventChannel returns the event channel for external consumers +func (i *InProcess) EventChannel() <-chan of.Event { + return i.events } -func (i *InProcess) ResolveBoolean(ctx context.Context, key string, defaultValue bool, - evalCtx map[string]interface{}) of.BoolResolutionDetail { +// appendMetadata adds service metadata to evaluation metadata +func (i *InProcess) appendMetadata(evalMetadata model.Metadata) { + for k, v := range i.serviceMetadata { + evalMetadata[k] = v + } +} + +// ResolveBoolean resolves a boolean flag value +func (i *InProcess) ResolveBoolean(ctx context.Context, key string, defaultValue bool, evalCtx map[string]interface{}) of.BoolResolutionDetail { value, variant, reason, metadata, err := i.evaluator.ResolveBooleanValue(ctx, "", key, evalCtx) i.appendMetadata(metadata) + if err != nil { return of.BoolResolutionDetail{ Value: defaultValue, @@ -239,10 +433,11 @@ func (i *InProcess) ResolveBoolean(ctx context.Context, key string, defaultValue } } -func (i *InProcess) ResolveString(ctx context.Context, key string, defaultValue string, - evalCtx map[string]interface{}) of.StringResolutionDetail { +// ResolveString resolves a string flag value +func (i *InProcess) ResolveString(ctx context.Context, key string, defaultValue string, evalCtx map[string]interface{}) of.StringResolutionDetail { value, variant, reason, metadata, err := i.evaluator.ResolveStringValue(ctx, "", key, evalCtx) i.appendMetadata(metadata) + if err != nil { return of.StringResolutionDetail{ Value: defaultValue, @@ -265,10 +460,11 @@ func (i *InProcess) ResolveString(ctx context.Context, key string, defaultValue } } -func (i *InProcess) ResolveFloat(ctx context.Context, key string, defaultValue float64, - evalCtx map[string]interface{}) of.FloatResolutionDetail { +// ResolveFloat resolves a float flag value +func (i *InProcess) ResolveFloat(ctx context.Context, key string, defaultValue float64, evalCtx map[string]interface{}) of.FloatResolutionDetail { value, variant, reason, metadata, err := i.evaluator.ResolveFloatValue(ctx, "", key, evalCtx) i.appendMetadata(metadata) + if err != nil { return of.FloatResolutionDetail{ Value: defaultValue, @@ -291,10 +487,11 @@ func (i *InProcess) ResolveFloat(ctx context.Context, key string, defaultValue f } } -func (i *InProcess) ResolveInt(ctx context.Context, key string, defaultValue int64, - evalCtx map[string]interface{}) of.IntResolutionDetail { +// ResolveInt resolves an integer flag value +func (i *InProcess) ResolveInt(ctx context.Context, key string, defaultValue int64, evalCtx map[string]interface{}) of.IntResolutionDetail { value, variant, reason, metadata, err := i.evaluator.ResolveIntValue(ctx, "", key, evalCtx) i.appendMetadata(metadata) + if err != nil { return of.IntResolutionDetail{ Value: defaultValue, @@ -317,10 +514,11 @@ func (i *InProcess) ResolveInt(ctx context.Context, key string, defaultValue int } } -func (i *InProcess) ResolveObject(ctx context.Context, key string, defaultValue interface{}, - evalCtx map[string]interface{}) of.InterfaceResolutionDetail { +// ResolveObject resolves an object flag value +func (i *InProcess) ResolveObject(ctx context.Context, key string, defaultValue interface{}, evalCtx map[string]interface{}) of.InterfaceResolutionDetail { value, variant, reason, metadata, err := i.evaluator.ResolveObjectValue(ctx, "", key, evalCtx) i.appendMetadata(metadata) + if err != nil { return of.InterfaceResolutionDetail{ Value: defaultValue, @@ -343,42 +541,25 @@ func (i *InProcess) ResolveObject(ctx context.Context, key string, defaultValue } } -func (i *InProcess) EventChannel() <-chan of.Event { - return i.events -} - -func (i *InProcess) appendMetadata(evalMetadata model.Metadata) { - // For a nil slice, the number of iterations is 0 - for k, v := range i.serviceMetadata { - evalMetadata[k] = v - } -} - -// makeSyncProvider is a helper to create sync.ISync and return the underlying uri used by it to the caller -func makeSyncProvider(cfg Configuration, log *logger.Logger) (sync.ISync, string) { +// createSyncProvider creates the appropriate sync provider based on configuration +func createSyncProvider(cfg Configuration, log *logger.Logger) (isync.ISync, string) { if cfg.CustomSyncProvider != nil { - log.Info("operating in in-process mode with a custom sync provider at " + cfg.CustomSyncProviderUri) + log.Info("using custom sync provider at " + cfg.CustomSyncProviderUri) return cfg.CustomSyncProvider, cfg.CustomSyncProviderUri } if cfg.OfflineFlagSource != "" { - // file sync provider - log.Info("operating in in-process mode with offline flags sourced from " + cfg.OfflineFlagSource) + log.Info("using file sync provider with source: " + cfg.OfflineFlagSource) return &file.Sync{ URI: cfg.OfflineFlagSource, Logger: log, - Mux: ¶llel.RWMutex{}, + Mux: &sync.RWMutex{}, }, cfg.OfflineFlagSource } - // grpc sync provider (default uri based on `dns`) - uri := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) - - if cfg.TargetUri != "" && isValidTargetScheme(cfg.TargetUri) { - uri = cfg.TargetUri - } - - log.Info("operating in in-process mode with flags sourced from " + uri) + // Default to gRPC sync provider + uri := buildGrpcUri(cfg) + log.Info("using gRPC sync provider with URI: " + uri) return &Sync{ CredentialBuilder: &credentials.CredentialBuilder{}, @@ -392,7 +573,15 @@ func makeSyncProvider(cfg Configuration, log *logger.Logger) (sync.ISync, string }, uri } -// mapError is a helper to map evaluation errors to OF errors +// buildGrpcUri constructs the gRPC URI from configuration +func buildGrpcUri(cfg Configuration) string { + if cfg.TargetUri != "" && isValidTargetScheme(cfg.TargetUri) { + return cfg.TargetUri + } + return fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) +} + +// mapError maps evaluation errors to OpenFeature errors func mapError(flagKey string, err error) of.ResolutionError { switch err.Error() { case model.FlagNotFoundErrorCode: @@ -408,6 +597,7 @@ func mapError(flagKey string, err error) of.ResolutionError { } } +// isValidTargetScheme validates the gRPC target URI scheme func isValidTargetScheme(targetUri string) bool { regx := regexp.MustCompile("^" + grpc.SupportedScheme) return regx.Match([]byte(targetUri)) diff --git a/tests/flagd/testframework/config_steps.go b/tests/flagd/testframework/config_steps.go index c24f4e8b0..ddf7ac492 100644 --- a/tests/flagd/testframework/config_steps.go +++ b/tests/flagd/testframework/config_steps.go @@ -19,7 +19,6 @@ var ignoredOptions = []string{ "keepAliveTime", "retryBackoffMs", "retryBackoffMaxMs", - "retryGracePeriod", "offlinePollIntervalMs", } diff --git a/tests/flagd/testframework/provider_steps.go b/tests/flagd/testframework/provider_steps.go index 58b34cee2..165853b2e 100644 --- a/tests/flagd/testframework/provider_steps.go +++ b/tests/flagd/testframework/provider_steps.go @@ -40,6 +40,11 @@ func (s *TestState) createProviderInstance() error { var provider openfeature.FeatureProvider var err error + s.ProviderOptions = append(s.ProviderOptions, ProviderOption{ + Option: "RetryGracePeriod", + ValueType: "Integer", + Value: "1", + }) switch s.ProviderType { case RPC: if RPCProviderSupplier == nil { diff --git a/tests/flagd/testframework/step_definitions.go b/tests/flagd/testframework/step_definitions.go index e976562f0..3a76e088e 100644 --- a/tests/flagd/testframework/step_definitions.go +++ b/tests/flagd/testframework/step_definitions.go @@ -47,9 +47,9 @@ func InitializeScenario(ctx *godog.ScenarioContext) { scenarioMutex.Lock() defer scenarioMutex.Unlock() if state, ok := ctx.Value(TestStateKey{}).(*TestState); ok { + state.clearEvents() state.CleanupEnvironmentVariables() state.cleanupProvider() - state.clearEvents() } return ctx, nil }) @@ -95,11 +95,11 @@ func (s *TestState) cleanupProvider() { if s.Provider != nil { // Try to cast to common provider interfaces that might have shutdown methods // This is defensive - not all providers will have explicit shutdown - go func() { - if shutdownable, ok := s.Provider.(interface{ Shutdown() }); ok { - shutdownable.Shutdown() - } - }() + + if shutdownable, ok := s.Provider.(interface{ Shutdown() }); ok { + shutdownable.Shutdown() + } + s.Provider = nil } } From d7d9a1601275935aa8b5015c29a0b4c0a87cf835 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Mon, 8 Sep 2025 21:29:40 +0200 Subject: [PATCH 5/9] fixup: fixing test Signed-off-by: Simon Schrottner --- .../service/in_process/service_custom_sync_provider_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/flagd/pkg/service/in_process/service_custom_sync_provider_test.go b/providers/flagd/pkg/service/in_process/service_custom_sync_provider_test.go index 2be9c1801..5471a6a1d 100644 --- a/providers/flagd/pkg/service/in_process/service_custom_sync_provider_test.go +++ b/providers/flagd/pkg/service/in_process/service_custom_sync_provider_test.go @@ -9,7 +9,7 @@ func TestInProcessWithCustomSyncProvider(t *testing.T) { service := NewInProcessService(Configuration{CustomSyncProvider: customSyncProvider, CustomSyncProviderUri: "not tested here"}) // If custom sync provider is supplied the in-process service should use it. - if service.sync != customSyncProvider { - t.Fatalf("Expected service.sync to be the mockCustomSyncProvider, but got %s", service.sync) + if service.syncProvider != customSyncProvider { + t.Fatalf("Expected service.sync to be the mockCustomSyncProvider, but got %s", service.syncProvider) } } From 100e82feb9df8ace5205a0cd0c8fea309a1ebfe4 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Tue, 9 Sep 2025 06:49:32 +0200 Subject: [PATCH 6/9] feat: add retry configuration Signed-off-by: Simon Schrottner --- .gitmodules | 1 + .../flagd/pkg/service/in_process/grpc_sync.go | 47 +++++++++++++++---- .../flagd/pkg/service/in_process/service.go | 5 +- 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/.gitmodules b/.gitmodules index 0e2445c7d..3f69f7394 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,4 @@ [submodule "providers/flagd/flagd-testbed"] path = providers/flagd/flagd-testbed url = https://github.com/open-feature/flagd-testbed.git + branch = v2.11.1 diff --git a/providers/flagd/pkg/service/in_process/grpc_sync.go b/providers/flagd/pkg/service/in_process/grpc_sync.go index 8b40f97ab..30e1e0ffe 100644 --- a/providers/flagd/pkg/service/in_process/grpc_sync.go +++ b/providers/flagd/pkg/service/in_process/grpc_sync.go @@ -24,9 +24,43 @@ const ( SupportedScheme = "(envoy|dns|uds|xds)" // Default timeouts and retry intervals - defaultRetryDelay = 1 * time.Second defaultKeepaliveTime = 30 * time.Second defaultKeepaliveTimeout = 5 * time.Second + + retryPolicy = `{ + "methodConfig": [ + { + "name": [ + { + "service": "flagd.sync.v1.FlagSyncService" + } + ], + "retryPolicy": { + "MaxAttempts": 3, + "InitialBackoff": "1s", + "MaxBackoff": "5s", + "BackoffMultiplier": 2.0, + "RetryableStatusCodes": [ + "CANCELLED", + "UNKNOWN", + "INVALID_ARGUMENT", + "NOT_FOUND", + "ALREADY_EXISTS", + "PERMISSION_DENIED", + "RESOURCE_EXHAUSTED", + "FAILED_PRECONDITION", + "ABORTED", + "OUT_OF_RANGE", + "UNIMPLEMENTED", + "INTERNAL", + "UNAVAILABLE", + "DATA_LOSS", + "UNAUTHENTICATED" + ] + } + } + ] + }` ) // Type aliases for interfaces required by this component - needed for mock generation with gomock @@ -128,6 +162,8 @@ func (g *Sync) buildDialOptions() ([]grpc.DialOption, error) { } dialOptions = append(dialOptions, grpc.WithKeepaliveParams(keepaliveParams)) + dialOptions = append(dialOptions, grpc.WithDefaultServiceConfig(retryPolicy)) + return dialOptions, nil } @@ -185,13 +221,9 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { } g.Logger.Warn(fmt.Sprintf("sync cycle failed: %v, retrying...", err)) + g.sendEvent(ctx, SyncEvent{event: of.ProviderError}) - // Wait before retry with cancellation support - select { - case <-time.After(defaultRetryDelay): - continue - case <-ctx.Done(): - g.Logger.Info("sync stopped during retry delay due to context cancellation") + if ctx.Err() != nil { return ctx.Err() } } @@ -342,7 +374,6 @@ func (g *Sync) handleConnectionState(ctx context.Context, state connectivity.Sta case connectivity.Ready: g.Logger.Info(fmt.Sprintf("gRPC connection ready for %s", g.URI)) - g.sendEvent(ctx, SyncEvent{event: of.ProviderReady}) case connectivity.Idle: g.Logger.Debug(fmt.Sprintf("gRPC connection idle for %s", g.URI)) diff --git a/providers/flagd/pkg/service/in_process/service.go b/providers/flagd/pkg/service/in_process/service.go index ae27899db..6cb6d0e1b 100644 --- a/providers/flagd/pkg/service/in_process/service.go +++ b/providers/flagd/pkg/service/in_process/service.go @@ -82,10 +82,9 @@ func (st *staleTimer) start(duration time.Duration, callback func()) { st.mu.Lock() defer st.mu.Unlock() - if st.timer != nil { - st.timer.Stop() + if st.timer == nil { + st.timer = time.AfterFunc(duration, callback) } - st.timer = time.AfterFunc(duration, callback) } // stop stops the stale timer From 0e24facd7a2c2e57530aa1399bb13254db6fa96d Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Tue, 9 Sep 2025 06:59:55 +0200 Subject: [PATCH 7/9] fixup: try fixing test execution. seems like startup, etc is taking longer for e2e tests Signed-off-by: Simon Schrottner --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 85ba0c369..a2de2b15c 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ test: # call with TESTCONTAINERS_RYUK_DISABLED="true" to avoid problems with podman on Macs e2e: - go clean -testcache && go list -f '{{.Dir}}/...' -m | xargs -I{} go test -timeout=1m -tags=e2e {} + go clean -testcache && go list -f '{{.Dir}}/...' -m | xargs -I{} go test -timeout=2m -tags=e2e {} lint: go install -v github.com/golangci/golangci-lint/cmd/golangci-lint@v1.64.8 From 9414c4d7696ffec0796faa760c2f51e252b8557d Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Tue, 9 Sep 2025 07:11:16 +0200 Subject: [PATCH 8/9] fixup: gemini suggestions: Signed-off-by: Simon Schrottner --- providers/flagd/flagd-testbed | 2 +- providers/flagd/pkg/service/in_process/grpc_sync.go | 11 ++--------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/providers/flagd/flagd-testbed b/providers/flagd/flagd-testbed index fdce98780..892889d46 160000 --- a/providers/flagd/flagd-testbed +++ b/providers/flagd/flagd-testbed @@ -1 +1 @@ -Subproject commit fdce98780f5811bd4672fb7f2b56a6be05fc46d2 +Subproject commit 892889d46fcc53ae4ab71d5eafd7f2c5282d48ce diff --git a/providers/flagd/pkg/service/in_process/grpc_sync.go b/providers/flagd/pkg/service/in_process/grpc_sync.go index 30e1e0ffe..9b6b93caa 100644 --- a/providers/flagd/pkg/service/in_process/grpc_sync.go +++ b/providers/flagd/pkg/service/in_process/grpc_sync.go @@ -17,12 +17,6 @@ import ( ) const ( - // Prefix for GRPC URL inputs. GRPC does not define a standard prefix. This prefix helps to differentiate remote - // URLs for REST APIs (i.e - HTTP) from GRPC endpoints. - Prefix = "grpc://" - PrefixSecure = "grpcs://" - SupportedScheme = "(envoy|dns|uds|xds)" - // Default timeouts and retry intervals defaultKeepaliveTime = 30 * time.Second defaultKeepaliveTimeout = 5 * time.Second @@ -72,8 +66,6 @@ type FlagSyncServiceClientResponse interface { syncv1grpc.FlagSyncService_SyncFlagsClient } -var once msync.Once - // Sync implements gRPC-based flag synchronization with improved context cancellation and error handling type Sync struct { // Configuration @@ -94,6 +86,7 @@ type Sync struct { events chan SyncEvent shutdownComplete chan struct{} shutdownOnce msync.Once + initializer msync.Once } // Init initializes the gRPC connection and starts background monitoring @@ -256,7 +249,7 @@ func (g *Sync) performSyncCycle(ctx context.Context, dataSync chan<- sync.DataSy // handleFlagSync processes messages from the sync stream with proper context handling func (g *Sync) handleFlagSync(ctx context.Context, stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { // Mark as ready on first successful stream - once.Do(func() { + g.initializer.Do(func() { g.ready = true g.Logger.Info("sync service is now ready") }) From bbc626923a6e62a3b62949019289a8ddb4b7dda9 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Tue, 9 Sep 2025 14:50:29 +0200 Subject: [PATCH 9/9] fixup: update harness for better tests Signed-off-by: Simon Schrottner --- providers/flagd/flagd-testbed | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/flagd/flagd-testbed b/providers/flagd/flagd-testbed index 892889d46..b62f5dbe8 160000 --- a/providers/flagd/flagd-testbed +++ b/providers/flagd/flagd-testbed @@ -1 +1 @@ -Subproject commit 892889d46fcc53ae4ab71d5eafd7f2c5282d48ce +Subproject commit b62f5dbe860ecf4f36ec757dfdc0b38f7b3dec6e