Skip to content

Commit e07a5e4

Browse files
authored
fix: Handle closed EventChannel from multiprovider (#461)
Signed-off-by: Saleem Rashid <dev@saleemrashid.com>
1 parent 4ee43eb commit e07a5e4

File tree

3 files changed

+65
-17
lines changed

3 files changed

+65
-17
lines changed

openfeature/event_executor.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -222,25 +222,25 @@ func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReferen
222222
if !isRunning(newProvider, e.activeSubscriptions) {
223223
e.activeSubscriptions = append(e.activeSubscriptions, newProvider)
224224

225-
go func() {
226-
v, ok := newProvider.featureProvider.(EventHandler)
227-
if !ok {
228-
return
229-
}
230-
231-
// event handling of the new feature provider
232-
for {
233-
select {
234-
case event := <-v.EventChannel():
235-
e.eventChan <- eventPayload{
236-
event: event,
237-
handler: newProvider.featureProvider,
225+
if v, ok := newProvider.featureProvider.(EventHandler); ok {
226+
go func() {
227+
// event handling of the new feature provider
228+
for {
229+
select {
230+
case event, ok := <-v.EventChannel():
231+
if !ok {
232+
return
233+
}
234+
e.eventChan <- eventPayload{
235+
event: event,
236+
handler: newProvider.featureProvider,
237+
}
238+
case <-newProvider.shutdownSemaphore:
239+
return
238240
}
239-
case <-newProvider.shutdownSemaphore:
240-
return
241241
}
242-
}
243-
}()
242+
}()
243+
}
244244
}
245245

246246
// shutdown old provider handling

openfeature/event_executor_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ import (
44
"errors"
55
"reflect"
66
"slices"
7+
"sync/atomic"
78
"testing"
89
"time"
10+
11+
"github.com/stretchr/testify/require"
912
)
1013

1114
func init() {
@@ -1541,3 +1544,44 @@ func TestEventHandler_APIRemoval(t *testing.T) {
15411544
executor.RemoveClientHandler("a", ProviderReady, &h1)
15421545
})
15431546
}
1547+
1548+
func TestEventHandler_ChannelClosure(t *testing.T) {
1549+
eventingImpl := &ProviderEventing{
1550+
c: make(chan Event, 1),
1551+
}
1552+
1553+
eventingProvider := struct {
1554+
FeatureProvider
1555+
EventHandler
1556+
}{
1557+
NoopProvider{},
1558+
eventingImpl,
1559+
}
1560+
1561+
executor := newEventExecutor()
1562+
1563+
err := executor.registerDefaultProvider(eventingProvider)
1564+
require.NoError(t, err)
1565+
require.Len(t, executor.activeSubscriptions, 1)
1566+
1567+
var eventCount atomic.Int64
1568+
callBack := func(e EventDetails) {
1569+
eventCount.Add(1)
1570+
}
1571+
executor.AddHandler(ProviderReady, &callBack)
1572+
// watch for empty events
1573+
executor.AddHandler("", &callBack)
1574+
eventingImpl.Invoke(Event{EventType: ProviderReady})
1575+
1576+
require.Eventually(t, func() bool {
1577+
return eventCount.Load() >= 1
1578+
}, 100*time.Millisecond, 10*time.Millisecond, "event not received")
1579+
1580+
initialCount := eventCount.Load()
1581+
eventingImpl.Close()
1582+
1583+
<-time.After(100 * time.Millisecond)
1584+
1585+
afterCount := eventCount.Load()
1586+
require.Equal(t, initialCount, afterCount, "goroutine processed events after channel closed - indicates channel closure not detected")
1587+
}

openfeature/util_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ func (s ProviderEventing) EventChannel() <-chan Event {
6565
return s.c
6666
}
6767

68+
func (s ProviderEventing) Close() {
69+
close(s.c)
70+
}
71+
6872
func eventually(t *testing.T, condition func() bool, timeout, interval time.Duration, errMsg string) {
6973
t.Helper()
7074

0 commit comments

Comments
 (0)