Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions event_executor.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package openfeature

import (
"fmt"
"log/slog"
"maps"
reflect "reflect"
"slices"
"sync"
"time"
)

const defaultDomain = ""
Expand Down Expand Up @@ -237,32 +235,32 @@ func (e *eventExecutor) State(domain string) State {
}

// registerDefaultProvider registers the default FeatureProvider and remove the old default provider if available
func (e *eventExecutor) registerDefaultProvider(provider FeatureProvider) error {
func (e *eventExecutor) registerDefaultProvider(provider FeatureProvider) {
e.mu.Lock()
defer e.mu.Unlock()

newProvider := newProviderRef(provider)
oldProvider := e.defaultProviderReference
e.defaultProviderReference = newProvider

return e.startListeningAndShutdownOld(newProvider, oldProvider)
e.startListeningAndShutdownOld(newProvider, oldProvider)
}

// registerNamedEventingProvider registers a named FeatureProvider and remove event listener for old named provider
func (e *eventExecutor) registerNamedEventingProvider(associatedClient string, provider FeatureProvider) error {
func (e *eventExecutor) registerNamedEventingProvider(associatedClient string, provider FeatureProvider) {
e.mu.Lock()
defer e.mu.Unlock()
newProvider := newProviderRef(provider)

oldProvider := e.namedProviderReference[associatedClient]
e.namedProviderReference[associatedClient] = newProvider

return e.startListeningAndShutdownOld(newProvider, oldProvider)
e.startListeningAndShutdownOld(newProvider, oldProvider)
}

// startListeningAndShutdownOld is a helper to start concurrent listening to new provider events and invoke shutdown
// hook of the old provider if it's not bound by another subscription
func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReference, oldReference providerReference) error {
func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReference, oldReference providerReference) {
// check if this provider already actively handled - 1:N binding capability
if !isRunning(newProvider, e.activeSubscriptions) {
e.activeSubscriptions = append(e.activeSubscriptions, newProvider)
Expand Down Expand Up @@ -292,7 +290,7 @@ func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReferen

// check if this provider is still bound - 1:N binding capability
if isBound(oldReference, e.defaultProviderReference, slices.Collect(maps.Values(e.namedProviderReference))) {
return nil
return
}

// drop from active references
Expand All @@ -303,16 +301,29 @@ func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReferen
_, ok := oldReference.featureProvider.(EventHandler)
if !ok {
// no shutdown for non event handling provider
return nil
return
}

// avoid shutdown lockouts
select {
case oldReference.shutdownSemaphore <- "":
return nil
case <-time.After(200 * time.Millisecond):
return fmt.Errorf("old event handler %s timeout waiting for handler shutdown",
oldReference.featureProvider.Metadata().Name)
return
default:
// This should never happen:
//
// providerReference.shutdownSemaphore is created with
// a buffer size of 1, so it should allow sending at
// least one shutdown signal without blocking. Locking
// should prevent us from sending more than one
// signal.
//
// In the unlikely case that it does not, this
// shouldn't be a big deal: we have already swapped
// references in eventExecutor and openfeatureAPI, and
// the handler should be able to receive at least one
// shutdown signal later.
slog.Info("OF BUG: failed to send shutdown to old event handler",
"provider", oldReference.featureProvider.Metadata().Name)
}
}

Expand Down
76 changes: 20 additions & 56 deletions event_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,13 @@ func TestEventHandler_RegisterUnregisterEventProvider(t *testing.T) {
}

executor := newEventExecutor()
err := executor.registerDefaultProvider(eventingProvider)
if err != nil {
t.Fatal(err)
}
executor.registerDefaultProvider(eventingProvider)

if executor.defaultProviderReference.featureProvider != eventingProvider {
t.Error("implementation should register default eventing provider")
}

err = executor.registerNamedEventingProvider("domain", eventingProvider)
if err != nil {
t.Fatal(err)
}
executor.registerNamedEventingProvider("domain", eventingProvider)

if _, ok := executor.namedProviderReference["domain"]; !ok {
t.Errorf("implementation should register named eventing provider")
Expand Down Expand Up @@ -959,7 +953,6 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) {
eventingImpl := &ProviderEventing{
c: make(chan Event, 1),
}
eventingImpl.Invoke(Event{EventType: ProviderError})

provider := struct {
FeatureProvider
Expand All @@ -983,6 +976,9 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) {
AddHandler(ProviderStale, callback)
AddHandler(ProviderConfigChange, callback)

// Emit error event from the provider
eventingImpl.Invoke(Event{EventType: ProviderError})

// assert client transitioned to ERROR
eventually(t, func() bool {
return NewClient().State() == ErrorState
Expand All @@ -1002,7 +998,6 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) {
eventingImpl := &ProviderEventing{
c: make(chan Event, 1),
}
eventingImpl.Invoke(Event{EventType: ProviderStale})

provider := struct {
FeatureProvider
Expand All @@ -1026,6 +1021,9 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) {
AddHandler(ProviderError, callback)
AddHandler(ProviderConfigChange, callback)

// Emit stale event from the provider.
eventingImpl.Invoke(Event{EventType: ProviderStale})

// assert client transitioned to STALE
eventually(t, func() bool {
return NewClient().State() == StaleState
Expand Down Expand Up @@ -1191,25 +1189,16 @@ func TestEventHandler_1ToNMapping(t *testing.T) {

executor := newEventExecutor()

err := executor.registerDefaultProvider(eventingProvider)
if err != nil {
t.Fatal(err)
}
executor.registerDefaultProvider(eventingProvider)

if len(executor.activeSubscriptions) != 1 &&
executor.activeSubscriptions[0].featureProvider != eventingProvider.FeatureProvider {
t.Fatal("provider not added to active provider subscriptions")
}

err = executor.registerNamedEventingProvider("clientA", eventingProvider)
if err != nil {
t.Fatal(err)
}
executor.registerNamedEventingProvider("clientA", eventingProvider)

err = executor.registerNamedEventingProvider("clientB", eventingProvider)
if err != nil {
t.Fatal(err)
}
executor.registerNamedEventingProvider("clientB", eventingProvider)

if len(executor.activeSubscriptions) != 1 {
t.Fatal("multiple provided in active subscriptions")
Expand All @@ -1229,15 +1218,9 @@ func TestEventHandler_1ToNMapping(t *testing.T) {

executor := newEventExecutor()

err := executor.registerDefaultProvider(eventingProvider)
if err != nil {
t.Fatal(err)
}
executor.registerDefaultProvider(eventingProvider)

err = executor.registerNamedEventingProvider("clientA", eventingProvider)
if err != nil {
t.Fatal(err)
}
executor.registerNamedEventingProvider("clientA", eventingProvider)

overridingProvider := struct {
FeatureProvider
Expand All @@ -1249,10 +1232,7 @@ func TestEventHandler_1ToNMapping(t *testing.T) {
},
}

err = executor.registerNamedEventingProvider("clientA", overridingProvider)
if err != nil {
t.Fatal(err)
}
executor.registerNamedEventingProvider("clientA", overridingProvider)

if len(executor.activeSubscriptions) != 2 {
t.Fatal("error with active provider subscriptions")
Expand All @@ -1272,15 +1252,9 @@ func TestEventHandler_1ToNMapping(t *testing.T) {

executor := newEventExecutor()

err := executor.registerNamedEventingProvider("clientA", eventingProvider)
if err != nil {
t.Fatal(err)
}
executor.registerNamedEventingProvider("clientA", eventingProvider)

err = executor.registerNamedEventingProvider("clientB", eventingProvider)
if err != nil {
t.Fatal(err)
}
executor.registerNamedEventingProvider("clientB", eventingProvider)

overridingProvider := struct {
FeatureProvider
Expand All @@ -1292,10 +1266,7 @@ func TestEventHandler_1ToNMapping(t *testing.T) {
},
}

err = executor.registerNamedEventingProvider("clientA", overridingProvider)
if err != nil {
t.Fatal(err)
}
executor.registerNamedEventingProvider("clientA", overridingProvider)

if len(executor.activeSubscriptions) != 2 {
t.Fatal("error with active provider subscriptions")
Expand All @@ -1315,10 +1286,7 @@ func TestEventHandler_1ToNMapping(t *testing.T) {

executor := newEventExecutor()

err := executor.registerNamedEventingProvider("clientA", eventingProvider)
if err != nil {
t.Fatal(err)
}
executor.registerNamedEventingProvider("clientA", eventingProvider)

overridingProvider := struct {
FeatureProvider
Expand All @@ -1330,10 +1298,7 @@ func TestEventHandler_1ToNMapping(t *testing.T) {
},
}

err = executor.registerNamedEventingProvider("clientA", overridingProvider)
if err != nil {
t.Fatal(err)
}
executor.registerNamedEventingProvider("clientA", overridingProvider)

if len(executor.activeSubscriptions) != 1 &&
executor.activeSubscriptions[0].featureProvider != overridingProvider.FeatureProvider {
Expand Down Expand Up @@ -1552,8 +1517,7 @@ func TestEventHandler_ChannelClosure(t *testing.T) {

executor := newEventExecutor()

err := executor.registerDefaultProvider(eventingProvider)
require.NoError(t, err)
executor.registerDefaultProvider(eventingProvider)
require.Len(t, executor.activeSubscriptions, 1)

var eventCount atomic.Int64
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/cucumber/godog v0.15.1
github.com/stretchr/testify v1.11.1
go.uber.org/mock v0.6.0
golang.org/x/sync v0.18.0
golang.org/x/text v0.31.0
golang.org/x/sync v0.19.0
golang.org/x/text v0.33.0
)

require (
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y=
go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU=
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
Loading
Loading