Skip to content

Commit 4576e34

Browse files
committed
fix: make SetProviderAndWait behavior consistent with SetProvider
OpenFeature specification defines SetProviderAndWait as a waiting version of SetProvider, or as a shortcut for waiting on provider ready event. However, currently SetProvider and SetProviderAndWait exhibit non-trivial behavior differences besides waiting. SetProvider runs initialization asynchronously and potentially concurrently with shutdown of the old provider. The API is not blocked and the application author may initialize other providers concurrently, run evaluations, etc. 🐛: in this mode, the error from initializer is ignored when updating the provider state, so fatal/error states may be not set properly. SetProviderAndWait runs initialization synchronously while holding exclusive api.mu lock. This almost completely locks OpenFeature SDK: the application author cannot initialize other providers (for different domains), configure context or hooks, evaluate feature flags, or shutdown SDK. If a provider initialization blocks forever'ish, the SDK remains unusable and is unrecoverable. Another difference is that old provider is shutdown only after new provider has successfully initialized. 🐛: if the new provider fails to initialize, the old provider is already unset in API but will never be shutdown, and the new provider is not registered with api.eventExecutor (so if it comes back online after some time, nobody listens to its events, and the state will go out of sync if old provider continues emitting events). 🐛: in both modes, given that shutdown is run concurrently with updating subscriptions in eventExecutor, it is possible for the old provider to override the state of the new provider: 1. init finishes, emits provider ready event (directly from goroutine), updates state 2. old provider emits some event during shutdown (e.g., PROVIDER_ERROR or PROVIDER_STALE), eventExecutor receives the event and updates the state to error/stale 3. new provider is registered with eventExecutor but the state is already wrong. This PR introduces a couple of changes: Make initialization flow consistent across both modes: always initialize async but make "AndWait" methods wait for initialization outside of critical section. Make init respect returned error. Always call shutdown on old provider (if it is no longer used). Always register new provider with event executor. Do this before we start init/shutdown, so the old provider cannot influence state of the new provider. Make event executor registration non-erroring by making shutdown channel buffered (there were no good way to recover from registration error). Signed-off-by: Oleksii Shmalko <oleksii.shmalko@datadoghq.com>
1 parent 5b45d60 commit 4576e34

File tree

4 files changed

+117
-196
lines changed

4 files changed

+117
-196
lines changed

openfeature/event_executor.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package openfeature
22

33
import (
4-
"fmt"
54
"log/slog"
65
"maps"
76
"slices"
87
"sync"
9-
"time"
108
)
119

1210
const defaultDomain = ""
@@ -192,32 +190,32 @@ func (e *eventExecutor) State(domain string) State {
192190
}
193191

194192
// registerDefaultProvider registers the default FeatureProvider and remove the old default provider if available
195-
func (e *eventExecutor) registerDefaultProvider(provider FeatureProvider) error {
193+
func (e *eventExecutor) registerDefaultProvider(provider FeatureProvider) {
196194
e.mu.Lock()
197195
defer e.mu.Unlock()
198196

199197
newProvider := newProviderRef(provider)
200198
oldProvider := e.defaultProviderReference
201199
e.defaultProviderReference = newProvider
202200

203-
return e.startListeningAndShutdownOld(newProvider, oldProvider)
201+
e.startListeningAndShutdownOld(newProvider, oldProvider)
204202
}
205203

206204
// registerNamedEventingProvider registers a named FeatureProvider and remove event listener for old named provider
207-
func (e *eventExecutor) registerNamedEventingProvider(associatedClient string, provider FeatureProvider) error {
205+
func (e *eventExecutor) registerNamedEventingProvider(associatedClient string, provider FeatureProvider) {
208206
e.mu.Lock()
209207
defer e.mu.Unlock()
210208
newProvider := newProviderRef(provider)
211209

212210
oldProvider := e.namedProviderReference[associatedClient]
213211
e.namedProviderReference[associatedClient] = newProvider
214212

215-
return e.startListeningAndShutdownOld(newProvider, oldProvider)
213+
e.startListeningAndShutdownOld(newProvider, oldProvider)
216214
}
217215

218216
// startListeningAndShutdownOld is a helper to start concurrent listening to new provider events and invoke shutdown
219217
// hook of the old provider if it's not bound by another subscription
220-
func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReference, oldReference providerReference) error {
218+
func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReference, oldReference providerReference) {
221219
// check if this provider already actively handled - 1:N binding capability
222220
if !isRunning(newProvider, e.activeSubscriptions) {
223221
e.activeSubscriptions = append(e.activeSubscriptions, newProvider)
@@ -247,7 +245,7 @@ func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReferen
247245

248246
// check if this provider is still bound - 1:N binding capability
249247
if isBound(oldReference, e.defaultProviderReference, slices.Collect(maps.Values(e.namedProviderReference))) {
250-
return nil
248+
return
251249
}
252250

253251
// drop from active references
@@ -258,16 +256,28 @@ func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReferen
258256
_, ok := oldReference.featureProvider.(EventHandler)
259257
if !ok {
260258
// no shutdown for non event handling provider
261-
return nil
259+
return
262260
}
263261

264262
// avoid shutdown lockouts
265263
select {
266264
case oldReference.shutdownSemaphore <- "":
267-
return nil
268-
case <-time.After(200 * time.Millisecond):
269-
return fmt.Errorf("old event handler %s timeout waiting for handler shutdown",
270-
oldReference.featureProvider.Metadata().Name)
265+
default:
266+
// This should never happen:
267+
//
268+
// providerReference.shutdownSemaphore is created with
269+
// a buffer size of 1, so it should allow sending at
270+
// least one shutdown signal without blocking. Locking
271+
// should prevent us from sending more than one
272+
// signal.
273+
//
274+
// In the unlikely case that it does not, this
275+
// shouldn't be a big deal: we have already swapped
276+
// references in eventExecutor and openfeatureAPI, and
277+
// the handler should be able to receive at least one
278+
// shutdown signal later.
279+
slog.Info("OF BUG: failed to send shutdown to old event handler",
280+
"provider", oldReference.featureProvider.Metadata().Name)
271281
}
272282
}
273283

openfeature/event_executor_test.go

Lines changed: 20 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,13 @@ func TestEventHandler_RegisterUnregisterEventProvider(t *testing.T) {
3030
}
3131

3232
executor := newEventExecutor()
33-
err := executor.registerDefaultProvider(eventingProvider)
34-
if err != nil {
35-
t.Fatal(err)
36-
}
33+
executor.registerDefaultProvider(eventingProvider)
3734

3835
if executor.defaultProviderReference.featureProvider != eventingProvider {
3936
t.Error("implementation should register default eventing provider")
4037
}
4138

42-
err = executor.registerNamedEventingProvider("domain", eventingProvider)
43-
if err != nil {
44-
t.Fatal(err)
45-
}
39+
executor.registerNamedEventingProvider("domain", eventingProvider)
4640

4741
if _, ok := executor.namedProviderReference["domain"]; !ok {
4842
t.Errorf("implementation should register named eventing provider")
@@ -966,7 +960,6 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) {
966960
eventingImpl := &ProviderEventing{
967961
c: make(chan Event, 1),
968962
}
969-
eventingImpl.Invoke(Event{EventType: ProviderError})
970963

971964
provider := struct {
972965
FeatureProvider
@@ -990,6 +983,9 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) {
990983
AddHandler(ProviderStale, &callback)
991984
AddHandler(ProviderConfigChange, &callback)
992985

986+
// Emit error event from the provider
987+
eventingImpl.Invoke(Event{EventType: ProviderError})
988+
993989
// assert client transitioned to ERROR
994990
eventually(t, func() bool {
995991
return NewDefaultClient().State() == ErrorState
@@ -1009,7 +1005,6 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) {
10091005
eventingImpl := &ProviderEventing{
10101006
c: make(chan Event, 1),
10111007
}
1012-
eventingImpl.Invoke(Event{EventType: ProviderStale})
10131008

10141009
provider := struct {
10151010
FeatureProvider
@@ -1033,6 +1028,9 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) {
10331028
AddHandler(ProviderError, &callback)
10341029
AddHandler(ProviderConfigChange, &callback)
10351030

1031+
// Emit stale event from the provider.
1032+
eventingImpl.Invoke(Event{EventType: ProviderStale})
1033+
10361034
// assert client transitioned to STALE
10371035
eventually(t, func() bool {
10381036
return NewDefaultClient().State() == StaleState
@@ -1198,25 +1196,16 @@ func TestEventHandler_1ToNMapping(t *testing.T) {
11981196

11991197
executor := newEventExecutor()
12001198

1201-
err := executor.registerDefaultProvider(eventingProvider)
1202-
if err != nil {
1203-
t.Fatal(err)
1204-
}
1199+
executor.registerDefaultProvider(eventingProvider)
12051200

12061201
if len(executor.activeSubscriptions) != 1 &&
12071202
executor.activeSubscriptions[0].featureProvider != eventingProvider.FeatureProvider {
12081203
t.Fatal("provider not added to active provider subscriptions")
12091204
}
12101205

1211-
err = executor.registerNamedEventingProvider("clientA", eventingProvider)
1212-
if err != nil {
1213-
t.Fatal(err)
1214-
}
1206+
executor.registerNamedEventingProvider("clientA", eventingProvider)
12151207

1216-
err = executor.registerNamedEventingProvider("clientB", eventingProvider)
1217-
if err != nil {
1218-
t.Fatal(err)
1219-
}
1208+
executor.registerNamedEventingProvider("clientB", eventingProvider)
12201209

12211210
if len(executor.activeSubscriptions) != 1 {
12221211
t.Fatal("multiple provided in active subscriptions")
@@ -1236,15 +1225,9 @@ func TestEventHandler_1ToNMapping(t *testing.T) {
12361225

12371226
executor := newEventExecutor()
12381227

1239-
err := executor.registerDefaultProvider(eventingProvider)
1240-
if err != nil {
1241-
t.Fatal(err)
1242-
}
1228+
executor.registerDefaultProvider(eventingProvider)
12431229

1244-
err = executor.registerNamedEventingProvider("clientA", eventingProvider)
1245-
if err != nil {
1246-
t.Fatal(err)
1247-
}
1230+
executor.registerNamedEventingProvider("clientA", eventingProvider)
12481231

12491232
overridingProvider := struct {
12501233
FeatureProvider
@@ -1256,10 +1239,7 @@ func TestEventHandler_1ToNMapping(t *testing.T) {
12561239
},
12571240
}
12581241

1259-
err = executor.registerNamedEventingProvider("clientA", overridingProvider)
1260-
if err != nil {
1261-
t.Fatal(err)
1262-
}
1242+
executor.registerNamedEventingProvider("clientA", overridingProvider)
12631243

12641244
if len(executor.activeSubscriptions) != 2 {
12651245
t.Fatal("error with active provider subscriptions")
@@ -1279,15 +1259,9 @@ func TestEventHandler_1ToNMapping(t *testing.T) {
12791259

12801260
executor := newEventExecutor()
12811261

1282-
err := executor.registerNamedEventingProvider("clientA", eventingProvider)
1283-
if err != nil {
1284-
t.Fatal(err)
1285-
}
1262+
executor.registerNamedEventingProvider("clientA", eventingProvider)
12861263

1287-
err = executor.registerNamedEventingProvider("clientB", eventingProvider)
1288-
if err != nil {
1289-
t.Fatal(err)
1290-
}
1264+
executor.registerNamedEventingProvider("clientB", eventingProvider)
12911265

12921266
overridingProvider := struct {
12931267
FeatureProvider
@@ -1299,10 +1273,7 @@ func TestEventHandler_1ToNMapping(t *testing.T) {
12991273
},
13001274
}
13011275

1302-
err = executor.registerNamedEventingProvider("clientA", overridingProvider)
1303-
if err != nil {
1304-
t.Fatal(err)
1305-
}
1276+
executor.registerNamedEventingProvider("clientA", overridingProvider)
13061277

13071278
if len(executor.activeSubscriptions) != 2 {
13081279
t.Fatal("error with active provider subscriptions")
@@ -1322,10 +1293,7 @@ func TestEventHandler_1ToNMapping(t *testing.T) {
13221293

13231294
executor := newEventExecutor()
13241295

1325-
err := executor.registerNamedEventingProvider("clientA", eventingProvider)
1326-
if err != nil {
1327-
t.Fatal(err)
1328-
}
1296+
executor.registerNamedEventingProvider("clientA", eventingProvider)
13291297

13301298
overridingProvider := struct {
13311299
FeatureProvider
@@ -1337,10 +1305,7 @@ func TestEventHandler_1ToNMapping(t *testing.T) {
13371305
},
13381306
}
13391307

1340-
err = executor.registerNamedEventingProvider("clientA", overridingProvider)
1341-
if err != nil {
1342-
t.Fatal(err)
1343-
}
1308+
executor.registerNamedEventingProvider("clientA", overridingProvider)
13441309

13451310
if len(executor.activeSubscriptions) != 1 &&
13461311
executor.activeSubscriptions[0].featureProvider != overridingProvider.FeatureProvider {
@@ -1560,8 +1525,7 @@ func TestEventHandler_ChannelClosure(t *testing.T) {
15601525

15611526
executor := newEventExecutor()
15621527

1563-
err := executor.registerDefaultProvider(eventingProvider)
1564-
require.NoError(t, err)
1528+
executor.registerDefaultProvider(eventingProvider)
15651529
require.Len(t, executor.activeSubscriptions, 1)
15661530

15671531
var eventCount atomic.Int64

0 commit comments

Comments
 (0)