diff --git a/event_executor.go b/event_executor.go index 662fce81..f1efee47 100644 --- a/event_executor.go +++ b/event_executor.go @@ -1,13 +1,11 @@ package openfeature import ( - "fmt" "log/slog" "maps" reflect "reflect" "slices" "sync" - "time" ) const defaultDomain = "" @@ -237,7 +235,7 @@ func (e *eventExecutor) State(domain string) State { } // registerDefaultProvider registers the default FeatureProvider and remove the old default provider if available -func (e *eventExecutor) registerDefaultProvider(provider FeatureProvider) error { +func (e *eventExecutor) registerDefaultProvider(provider FeatureProvider) { e.mu.Lock() defer e.mu.Unlock() @@ -245,11 +243,11 @@ func (e *eventExecutor) registerDefaultProvider(provider FeatureProvider) error oldProvider := e.defaultProviderReference e.defaultProviderReference = newProvider - return e.startListeningAndShutdownOld(newProvider, oldProvider) + e.startListeningAndShutdownOld(newProvider, oldProvider) } // registerNamedEventingProvider registers a named FeatureProvider and remove event listener for old named provider -func (e *eventExecutor) registerNamedEventingProvider(associatedClient string, provider FeatureProvider) error { +func (e *eventExecutor) registerNamedEventingProvider(associatedClient string, provider FeatureProvider) { e.mu.Lock() defer e.mu.Unlock() newProvider := newProviderRef(provider) @@ -257,12 +255,12 @@ func (e *eventExecutor) registerNamedEventingProvider(associatedClient string, p oldProvider := e.namedProviderReference[associatedClient] e.namedProviderReference[associatedClient] = newProvider - return e.startListeningAndShutdownOld(newProvider, oldProvider) + e.startListeningAndShutdownOld(newProvider, oldProvider) } // startListeningAndShutdownOld is a helper to start concurrent listening to new provider events and invoke shutdown // hook of the old provider if it's not bound by another subscription -func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReference, oldReference providerReference) error { +func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReference, oldReference providerReference) { // check if this provider already actively handled - 1:N binding capability if !isRunning(newProvider, e.activeSubscriptions) { e.activeSubscriptions = append(e.activeSubscriptions, newProvider) @@ -292,7 +290,7 @@ func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReferen // check if this provider is still bound - 1:N binding capability if isBound(oldReference, e.defaultProviderReference, slices.Collect(maps.Values(e.namedProviderReference))) { - return nil + return } // drop from active references @@ -303,16 +301,29 @@ func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReferen _, ok := oldReference.featureProvider.(EventHandler) if !ok { // no shutdown for non event handling provider - return nil + return } // avoid shutdown lockouts select { case oldReference.shutdownSemaphore <- "": - return nil - case <-time.After(200 * time.Millisecond): - return fmt.Errorf("old event handler %s timeout waiting for handler shutdown", - oldReference.featureProvider.Metadata().Name) + return + default: + // This should never happen: + // + // providerReference.shutdownSemaphore is created with + // a buffer size of 1, so it should allow sending at + // least one shutdown signal without blocking. Locking + // should prevent us from sending more than one + // signal. + // + // In the unlikely case that it does not, this + // shouldn't be a big deal: we have already swapped + // references in eventExecutor and openfeatureAPI, and + // the handler should be able to receive at least one + // shutdown signal later. + slog.Info("OF BUG: failed to send shutdown to old event handler", + "provider", oldReference.featureProvider.Metadata().Name) } } diff --git a/event_executor_test.go b/event_executor_test.go index 0ac726b8..83615a52 100644 --- a/event_executor_test.go +++ b/event_executor_test.go @@ -28,19 +28,13 @@ func TestEventHandler_RegisterUnregisterEventProvider(t *testing.T) { } executor := newEventExecutor() - err := executor.registerDefaultProvider(eventingProvider) - if err != nil { - t.Fatal(err) - } + executor.registerDefaultProvider(eventingProvider) if executor.defaultProviderReference.featureProvider != eventingProvider { t.Error("implementation should register default eventing provider") } - err = executor.registerNamedEventingProvider("domain", eventingProvider) - if err != nil { - t.Fatal(err) - } + executor.registerNamedEventingProvider("domain", eventingProvider) if _, ok := executor.namedProviderReference["domain"]; !ok { t.Errorf("implementation should register named eventing provider") @@ -959,7 +953,6 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) { eventingImpl := &ProviderEventing{ c: make(chan Event, 1), } - eventingImpl.Invoke(Event{EventType: ProviderError}) provider := struct { FeatureProvider @@ -983,6 +976,9 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) { AddHandler(ProviderStale, callback) AddHandler(ProviderConfigChange, callback) + // Emit error event from the provider + eventingImpl.Invoke(Event{EventType: ProviderError}) + // assert client transitioned to ERROR eventually(t, func() bool { return NewClient().State() == ErrorState @@ -1002,7 +998,6 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) { eventingImpl := &ProviderEventing{ c: make(chan Event, 1), } - eventingImpl.Invoke(Event{EventType: ProviderStale}) provider := struct { FeatureProvider @@ -1026,6 +1021,9 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) { AddHandler(ProviderError, callback) AddHandler(ProviderConfigChange, callback) + // Emit stale event from the provider. + eventingImpl.Invoke(Event{EventType: ProviderStale}) + // assert client transitioned to STALE eventually(t, func() bool { return NewClient().State() == StaleState @@ -1191,25 +1189,16 @@ func TestEventHandler_1ToNMapping(t *testing.T) { executor := newEventExecutor() - err := executor.registerDefaultProvider(eventingProvider) - if err != nil { - t.Fatal(err) - } + executor.registerDefaultProvider(eventingProvider) if len(executor.activeSubscriptions) != 1 && executor.activeSubscriptions[0].featureProvider != eventingProvider.FeatureProvider { t.Fatal("provider not added to active provider subscriptions") } - err = executor.registerNamedEventingProvider("clientA", eventingProvider) - if err != nil { - t.Fatal(err) - } + executor.registerNamedEventingProvider("clientA", eventingProvider) - err = executor.registerNamedEventingProvider("clientB", eventingProvider) - if err != nil { - t.Fatal(err) - } + executor.registerNamedEventingProvider("clientB", eventingProvider) if len(executor.activeSubscriptions) != 1 { t.Fatal("multiple provided in active subscriptions") @@ -1229,15 +1218,9 @@ func TestEventHandler_1ToNMapping(t *testing.T) { executor := newEventExecutor() - err := executor.registerDefaultProvider(eventingProvider) - if err != nil { - t.Fatal(err) - } + executor.registerDefaultProvider(eventingProvider) - err = executor.registerNamedEventingProvider("clientA", eventingProvider) - if err != nil { - t.Fatal(err) - } + executor.registerNamedEventingProvider("clientA", eventingProvider) overridingProvider := struct { FeatureProvider @@ -1249,10 +1232,7 @@ func TestEventHandler_1ToNMapping(t *testing.T) { }, } - err = executor.registerNamedEventingProvider("clientA", overridingProvider) - if err != nil { - t.Fatal(err) - } + executor.registerNamedEventingProvider("clientA", overridingProvider) if len(executor.activeSubscriptions) != 2 { t.Fatal("error with active provider subscriptions") @@ -1272,15 +1252,9 @@ func TestEventHandler_1ToNMapping(t *testing.T) { executor := newEventExecutor() - err := executor.registerNamedEventingProvider("clientA", eventingProvider) - if err != nil { - t.Fatal(err) - } + executor.registerNamedEventingProvider("clientA", eventingProvider) - err = executor.registerNamedEventingProvider("clientB", eventingProvider) - if err != nil { - t.Fatal(err) - } + executor.registerNamedEventingProvider("clientB", eventingProvider) overridingProvider := struct { FeatureProvider @@ -1292,10 +1266,7 @@ func TestEventHandler_1ToNMapping(t *testing.T) { }, } - err = executor.registerNamedEventingProvider("clientA", overridingProvider) - if err != nil { - t.Fatal(err) - } + executor.registerNamedEventingProvider("clientA", overridingProvider) if len(executor.activeSubscriptions) != 2 { t.Fatal("error with active provider subscriptions") @@ -1315,10 +1286,7 @@ func TestEventHandler_1ToNMapping(t *testing.T) { executor := newEventExecutor() - err := executor.registerNamedEventingProvider("clientA", eventingProvider) - if err != nil { - t.Fatal(err) - } + executor.registerNamedEventingProvider("clientA", eventingProvider) overridingProvider := struct { FeatureProvider @@ -1330,10 +1298,7 @@ func TestEventHandler_1ToNMapping(t *testing.T) { }, } - err = executor.registerNamedEventingProvider("clientA", overridingProvider) - if err != nil { - t.Fatal(err) - } + executor.registerNamedEventingProvider("clientA", overridingProvider) if len(executor.activeSubscriptions) != 1 && executor.activeSubscriptions[0].featureProvider != overridingProvider.FeatureProvider { @@ -1552,8 +1517,7 @@ func TestEventHandler_ChannelClosure(t *testing.T) { executor := newEventExecutor() - err := executor.registerDefaultProvider(eventingProvider) - require.NoError(t, err) + executor.registerDefaultProvider(eventingProvider) require.Len(t, executor.activeSubscriptions, 1) var eventCount atomic.Int64 diff --git a/go.mod b/go.mod index afca89b3..bf8f8c60 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/cucumber/godog v0.15.1 github.com/stretchr/testify v1.11.1 go.uber.org/mock v0.6.0 - golang.org/x/sync v0.18.0 - golang.org/x/text v0.31.0 + golang.org/x/sync v0.19.0 + golang.org/x/text v0.33.0 ) require ( diff --git a/go.sum b/go.sum index 13844aa7..b347a14a 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,12 @@ go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y= go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU= golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= +golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/openfeature_api.go b/openfeature_api.go index b54c3e26..5ff3a70a 100644 --- a/openfeature_api.go +++ b/openfeature_api.go @@ -33,129 +33,97 @@ func newEvaluationAPI(eventExecutor *eventExecutor) *evaluationAPI { } func (api *evaluationAPI) SetProvider(ctx context.Context, provider FeatureProvider) error { - return api.setProviderWithContext(ctx, provider, true) + if provider == nil { + return errors.New("default provider cannot be set to nil") + } + api.setProviderWithContext(ctx, provider) + return nil } func (api *evaluationAPI) SetProviderAndWait(ctx context.Context, provider FeatureProvider) error { - return api.setProviderWithContext(ctx, provider, false) -} - -// GetProviderMetadata returns the default FeatureProvider's metadata -func (api *evaluationAPI) GetProviderMetadata() Metadata { - api.mu.RLock() - defer api.mu.RUnlock() - - return api.defaultProvider.Metadata() -} - -// GetNamedProviderMetadata returns the default FeatureProvider's metadata -func (api *evaluationAPI) GetNamedProviderMetadata(name string) Metadata { - api.mu.RLock() - defer api.mu.RUnlock() - - provider, ok := api.namedProviders[name] - if !ok { - return ProviderMetadata() + if provider == nil { + return errors.New("default provider cannot be set to nil") } - - return provider.Metadata() + initCh := api.setProviderWithContext(ctx, provider) + return <-initCh } // setProviderWithContext sets the default FeatureProvider of the evaluationAPI with context-aware initialization. -func (api *evaluationAPI) setProviderWithContext(ctx context.Context, provider FeatureProvider, async bool) error { +func (api *evaluationAPI) setProviderWithContext(ctx context.Context, provider FeatureProvider) <-chan error { api.mu.Lock() defer api.mu.Unlock() - if provider == nil { - return errors.New("default provider cannot be set to nil") - } - oldProvider := api.defaultProvider api.defaultProvider = provider - err := api.initNewAndShutdownOld(ctx, "", provider, oldProvider, async) - if err != nil { - return fmt.Errorf("failed to initialize default provider %q: %w", provider.Metadata().Name, err) - } + api.eventExecutor.registerDefaultProvider(provider) - err = api.eventExecutor.registerDefaultProvider(provider) - if err != nil { - return fmt.Errorf("failed to register default provider %q: %w", provider.Metadata().Name, err) - } + api.shutdownOld(ctx, oldProvider) - return nil + return api.initNew(ctx, "", provider) } -// setNamedProviderWithContext sets a provider with client name using context-aware initialization. -func (api *evaluationAPI) setNamedProviderWithContext(ctx context.Context, clientName string, provider FeatureProvider, async bool) error { +func (api *evaluationAPI) setNamedProviderWithContext(ctx context.Context, clientName string, provider FeatureProvider) <-chan error { api.mu.Lock() defer api.mu.Unlock() - if provider == nil { - return errors.New("provider cannot be set to nil") - } - - // Initialize new named provider and Shutdown the old one oldProvider := api.namedProviders[clientName] api.namedProviders[clientName] = provider - err := api.initNewAndShutdownOld(ctx, clientName, provider, oldProvider, async) - if err != nil { - return fmt.Errorf("failed to initialize named provider %q for domain %q: %w", provider.Metadata().Name, clientName, err) - } + api.eventExecutor.registerNamedEventingProvider(clientName, provider) - err = api.eventExecutor.registerNamedEventingProvider(clientName, provider) - if err != nil { - return fmt.Errorf("failed to register named provider %q for domain %q: %w", provider.Metadata().Name, clientName, err) - } + api.shutdownOld(ctx, oldProvider) - return nil + return api.initNew(ctx, clientName, provider) } -// SetNamedProviderWithContext sets a provider with client name using context-aware initialization. +// SetNamedProvider sets a provider with client name. Returns an error if FeatureProvider is nil. func (api *evaluationAPI) SetNamedProvider(ctx context.Context, clientName string, provider FeatureProvider) error { - return api.setNamedProviderWithContext(ctx, clientName, provider, true) + if provider == nil { + return errors.New("provider cannot be set to nil") + } + api.setNamedProviderWithContext(ctx, clientName, provider) + return nil } -// SetNamedProviderWithContextAndWait sets a provider with client name using context-aware initialization and waits for completion. +// SetNamedProviderAndWait sets a provider with client name and waits for initialization to complete. func (api *evaluationAPI) SetNamedProviderAndWait(ctx context.Context, clientName string, provider FeatureProvider) error { - return api.setNamedProviderWithContext(ctx, clientName, provider, false) + if provider == nil { + return errors.New("provider cannot be set to nil") + } + initCh := api.setNamedProviderWithContext(ctx, clientName, provider) + return <-initCh } -// initNewAndShutdownOld is the main helper to initialise new FeatureProvider and Shutdown the old FeatureProvider. -// Always uses the context-aware initializer with the provided context. -// -// When shutting down old providers that implement ContextAwareStateHandler, a 10-second timeout -// is applied to prevent hanging if the provider becomes unresponsive during shutdown. -func (api *evaluationAPI) initNewAndShutdownOld(ctx context.Context, clientName string, newProvider FeatureProvider, oldProvider FeatureProvider, async bool) error { - if async { - go func(executor *eventExecutor, evalCtx EvaluationContext, ctx context.Context, provider FeatureProvider, clientName string) { - // for async initialization, error is conveyed as an event - event, _ := initializerWithContext(ctx, provider, evalCtx) - executor.states.Store(clientName, stateFromEventOrError(event, nil)) - executor.triggerEvent(event, provider) - }(api.eventExecutor, api.evalCtx, ctx, newProvider, clientName) - } else { - event, err := initializerWithContext(ctx, newProvider, api.evalCtx) - api.eventExecutor.states.Store(clientName, stateFromEventOrError(event, err)) - api.eventExecutor.triggerEvent(event, newProvider) +func (api *evaluationAPI) initNew(ctx context.Context, clientName string, newProvider FeatureProvider) <-chan error { + errCh := make(chan error, 1) + + go func(executor *eventExecutor, evalCtx EvaluationContext, ctx context.Context, provider FeatureProvider, clientName string) { + event, err := initializerWithContext(ctx, provider, evalCtx) + executor.states.Store(clientName, stateFromEventOrError(event, err)) + executor.triggerEvent(event, provider) if err != nil { - return err + err = fmt.Errorf("failed to initialize named provider %q for domain %q: %w", provider.Metadata().Name, clientName, err) } - } + errCh <- err + }(api.eventExecutor, api.evalCtx, ctx, newProvider, clientName) + return errCh +} + +func (api *evaluationAPI) shutdownOld(ctx context.Context, oldProvider FeatureProvider) { v, ok := oldProvider.(StateHandler) // oldProvider can be nil or without state handling capability if oldProvider == nil || !ok { - return nil + return } namedProviders := slices.Collect(maps.Values(api.namedProviders)) // check for multiple bindings if oldProvider == api.defaultProvider || slices.Contains(namedProviders, oldProvider) { - return nil + return } go func(forShutdown StateHandler, parentCtx context.Context) { @@ -163,8 +131,27 @@ func (api *evaluationAPI) initNewAndShutdownOld(ctx context.Context, clientName slog.Error("async provider shutdown failed", slog.Any("error", err)) } }(v, ctx) +} - return nil +// GetProviderMetadata returns the default FeatureProvider's metadata +func (api *evaluationAPI) GetProviderMetadata() Metadata { + api.mu.RLock() + defer api.mu.RUnlock() + + return api.defaultProvider.Metadata() +} + +// GetNamedProviderMetadata returns the default FeatureProvider's metadata +func (api *evaluationAPI) GetNamedProviderMetadata(name string) Metadata { + api.mu.RLock() + defer api.mu.RUnlock() + + provider, ok := api.namedProviders[name] + if !ok { + return ProviderMetadata() + } + + return provider.Metadata() } // GetNamedProviders returns named providers map. diff --git a/reference.go b/reference.go index 046b421d..8c0b8554 100644 --- a/reference.go +++ b/reference.go @@ -9,7 +9,7 @@ func newProviderRef(provider FeatureProvider) providerReference { return providerReference{ featureProvider: provider, kind: reflect.TypeOf(provider).Kind(), - shutdownSemaphore: make(chan any), + shutdownSemaphore: make(chan any, 1), } }