Skip to content

Commit 5477d16

Browse files
author
Mike Davis
authored
Refactor execution context. (#212)
* Allow OptimizelyFactory to accept a plain old context object (POCO) * Rework ExecutionCtx into ExecGroup (ala errgroup) * Refactor Start methods to NOT manage their own goroutines, but assume to be executed within one (this is similar to http.ListenAndServe)
1 parent 125346f commit 5477d16

13 files changed

+265
-310
lines changed

pkg/client/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type OptimizelyClient struct {
4040
DecisionService decision.Service
4141
EventProcessor event.Processor
4242
notificationCenter notification.Center
43-
executionCtx utils.ExecutionCtx
43+
execGroup *utils.ExecGroup
4444
}
4545

4646
// Activate returns the key of the variation the user is bucketed into and queues up an impression event to be sent to
@@ -488,5 +488,5 @@ func (o *OptimizelyClient) GetProjectConfig() (projectConfig config.ProjectConfi
488488

489489
// Close closes the Optimizely instance and stops any ongoing tasks from its children components.
490490
func (o *OptimizelyClient) Close() {
491-
o.executionCtx.TerminateAndWait()
491+
o.execGroup.TerminateAndWait()
492492
}

pkg/client/client_test.go

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,31 +28,13 @@ import (
2828
"github.com/optimizely/go-sdk/pkg/entities"
2929
"github.com/optimizely/go-sdk/pkg/event"
3030
"github.com/optimizely/go-sdk/pkg/notification"
31+
"github.com/optimizely/go-sdk/pkg/utils"
3132

3233
"github.com/stretchr/testify/assert"
3334
"github.com/stretchr/testify/mock"
3435
"github.com/stretchr/testify/suite"
3536
)
3637

37-
var exeCtxSignalFlag bool
38-
39-
type ExecutionCtx struct {
40-
Wg *sync.WaitGroup
41-
Ctx context.Context
42-
}
43-
44-
func (ctx ExecutionCtx) TerminateAndWait() {
45-
exeCtxSignalFlag = true
46-
}
47-
48-
func (ctx ExecutionCtx) GetContext() context.Context {
49-
return ctx.Ctx
50-
}
51-
52-
func (ctx ExecutionCtx) GetWaitSync() *sync.WaitGroup {
53-
return ctx.Wg
54-
}
55-
5638
func ValidProjectConfigManager() *MockProjectConfigManager {
5739
p := new(MockProjectConfigManager)
5840
p.projectConfig = new(TestConfig)
@@ -1934,17 +1916,24 @@ func TestClose(t *testing.T) {
19341916
mockProcessor := &MockProcessor{}
19351917
mockDecisionService := new(MockDecisionService)
19361918

1919+
eg := utils.NewExecGroup(context.Background())
1920+
1921+
wg := &sync.WaitGroup{}
1922+
wg.Add(1)
1923+
eg.Go(func(ctx context.Context) {
1924+
<-ctx.Done()
1925+
wg.Done()
1926+
})
1927+
19371928
client := OptimizelyClient{
19381929
ConfigManager: ValidProjectConfigManager(),
19391930
DecisionService: mockDecisionService,
19401931
EventProcessor: mockProcessor,
1941-
executionCtx: new(ExecutionCtx),
1932+
execGroup: eg,
19421933
}
19431934

1944-
assert.False(t, exeCtxSignalFlag)
19451935
client.Close()
1946-
assert.True(t, exeCtxSignalFlag)
1947-
1936+
wg.Wait()
19481937
}
19491938

19501939
type ClientTestSuiteTrackEvent struct {

pkg/client/factory.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package client
1919

2020
import (
21+
"context"
2122
"errors"
2223
"time"
2324

@@ -34,10 +35,10 @@ type OptimizelyFactory struct {
3435
Datafile []byte
3536

3637
configManager config.ProjectConfigManager
38+
ctx context.Context
3739
decisionService decision.Service
3840
eventDispatcher event.Dispatcher
3941
eventProcessor event.Processor
40-
executionCtx utils.ExecutionCtx
4142
userProfileService decision.UserProfileService
4243
overrideStore decision.ExperimentOverrideStore
4344
}
@@ -56,14 +57,15 @@ func (f OptimizelyFactory) Client(clientOptions ...OptionFunc) (*OptimizelyClien
5657
return nil, errors.New("unable to instantiate client: no project config manager, SDK key, or a Datafile provided")
5758
}
5859

59-
var executionCtx utils.ExecutionCtx
60-
if f.executionCtx != nil {
61-
executionCtx = f.executionCtx
60+
var ctx context.Context
61+
if f.ctx != nil {
62+
ctx = f.ctx
6263
} else {
63-
executionCtx = utils.NewCancelableExecutionCtx()
64+
ctx = context.Background()
6465
}
6566

66-
appClient := &OptimizelyClient{executionCtx: executionCtx, notificationCenter: registry.GetNotificationCenter(f.SDKKey)}
67+
eg := utils.NewExecGroup(ctx)
68+
appClient := &OptimizelyClient{execGroup: eg, notificationCenter: registry.GetNotificationCenter(f.SDKKey)}
6769

6870
if f.configManager != nil {
6971
appClient.ConfigManager = f.configManager
@@ -103,11 +105,11 @@ func (f OptimizelyFactory) Client(clientOptions ...OptionFunc) (*OptimizelyClien
103105

104106
// Initialize the default services with the execution context
105107
if pollingConfigManager, ok := appClient.ConfigManager.(*config.PollingProjectConfigManager); ok {
106-
pollingConfigManager.Start(appClient.executionCtx)
108+
eg.Go(pollingConfigManager.Start)
107109
}
108110

109111
if batchProcessor, ok := appClient.EventProcessor.(*event.BatchEventProcessor); ok {
110-
batchProcessor.Start(appClient.executionCtx)
112+
eg.Go(batchProcessor.Start)
111113
}
112114

113115
return appClient, nil
@@ -171,10 +173,10 @@ func WithEventDispatcher(eventDispatcher event.Dispatcher) OptionFunc {
171173
}
172174
}
173175

174-
// WithExecutionContext allows user to pass in their own execution context to override the default one in the client.
175-
func WithExecutionContext(executionContext utils.ExecutionCtx) OptionFunc {
176+
// WithContext allows user to pass in their own context to override the default one in the client.
177+
func WithContext(ctx context.Context) OptionFunc {
176178
return func(f *OptimizelyFactory) {
177-
f.executionCtx = executionContext
179+
f.ctx = ctx
178180
}
179181
}
180182

pkg/client/factory_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
package client
1818

1919
import (
20+
"context"
2021
"errors"
2122
"net/http"
23+
"sync"
2224
"testing"
2325
"time"
2426

@@ -117,14 +119,23 @@ func TestClientWithDecisionServiceAndEventProcessorInOptions(t *testing.T) {
117119

118120
func TestClientWithCustomCtx(t *testing.T) {
119121
factory := OptimizelyFactory{}
120-
testExecutionCtx := utils.NewCancelableExecutionCtx()
122+
ctx, cancel := context.WithCancel(context.Background())
121123
mockConfigManager := new(MockProjectConfigManager)
122124
client, err := factory.Client(
123125
WithConfigManager(mockConfigManager),
124-
WithExecutionContext(testExecutionCtx),
126+
WithContext(ctx),
125127
)
126128
assert.NoError(t, err)
127-
assert.Equal(t, client.executionCtx, testExecutionCtx)
129+
130+
wg := &sync.WaitGroup{}
131+
wg.Add(1)
132+
client.execGroup.Go(func(ctx context.Context) {
133+
<-ctx.Done()
134+
wg.Done()
135+
})
136+
137+
cancel()
138+
wg.Wait()
128139
}
129140

130141
func TestStaticClient(t *testing.T) {

pkg/config/polling_manager.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package config
1919

2020
import (
21+
"context"
2122
"fmt"
2223
"net/http"
2324
"sync"
@@ -170,20 +171,18 @@ func (cm *PollingProjectConfigManager) SyncConfig(datafile []byte) {
170171
}
171172

172173
// Start starts the polling
173-
func (cm *PollingProjectConfigManager) Start(exeCtx utils.ExecutionCtx) {
174-
go func() {
175-
cmLogger.Debug("Polling Config Manager Initiated")
176-
t := time.NewTicker(cm.pollingInterval)
177-
for {
178-
select {
179-
case <-t.C:
180-
cm.SyncConfig([]byte{})
181-
case <-exeCtx.GetContext().Done():
182-
cmLogger.Debug("Polling Config Manager Stopped")
183-
return
184-
}
174+
func (cm *PollingProjectConfigManager) Start(ctx context.Context) {
175+
cmLogger.Debug("Polling Config Manager Initiated")
176+
t := time.NewTicker(cm.pollingInterval)
177+
for {
178+
select {
179+
case <-t.C:
180+
cm.SyncConfig([]byte{})
181+
case <-ctx.Done():
182+
cmLogger.Debug("Polling Config Manager Stopped")
183+
return
185184
}
186-
}()
185+
}
187186
}
188187

189188
// NewPollingProjectConfigManager returns an instance of the polling config manager with the customized configuration

pkg/config/polling_manager_test.go

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package config
1818

1919
import (
20+
"context"
2021
"net/http"
2122
"testing"
2223
"time"
@@ -39,6 +40,10 @@ func (m *MockRequester) Get(uri string, headers ...utils.Header) (response []byt
3940
return args.Get(0).([]byte), args.Get(1).(http.Header), args.Int(2), args.Error(3)
4041
}
4142

43+
func newExecGroup() *utils.ExecGroup {
44+
return utils.NewExecGroup(context.Background())
45+
}
46+
4247
func TestNewPollingProjectConfigManagerWithOptions(t *testing.T) {
4348

4449
mockDatafile := []byte(`{"revision":"42"}`)
@@ -49,17 +54,17 @@ func TestNewPollingProjectConfigManagerWithOptions(t *testing.T) {
4954
// Test we fetch using requester
5055
sdkKey := "test_sdk_key"
5156

52-
exeCtx := utils.NewCancelableExecutionCtx()
57+
eg := newExecGroup()
5358
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
54-
configManager.Start(exeCtx)
59+
eg.Go(configManager.Start)
5560
mockRequester.AssertExpectations(t)
5661

5762
actual, err := configManager.GetConfig()
5863
assert.Nil(t, err)
5964
assert.NotNil(t, actual)
6065
assert.Equal(t, projectConfig, actual)
6166

62-
exeCtx.TerminateAndWait() // just sending signal and improving coverage
67+
eg.TerminateAndWait() // just sending signal and improving coverage
6368
}
6469

6570
func TestNewPollingProjectConfigManagerWithNull(t *testing.T) {
@@ -70,9 +75,9 @@ func TestNewPollingProjectConfigManagerWithNull(t *testing.T) {
7075
// Test we fetch using requester
7176
sdkKey := "test_sdk_key"
7277

73-
exeCtx := utils.NewCancelableExecutionCtx()
78+
eg := newExecGroup()
7479
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
75-
configManager.Start(exeCtx)
80+
eg.Go(configManager.Start)
7681
mockRequester.AssertExpectations(t)
7782

7883
_, err := configManager.GetConfig()
@@ -88,9 +93,9 @@ func TestNewPollingProjectConfigManagerWithSimilarDatafileRevisions(t *testing.T
8893

8994
sdkKey := "test_sdk_key"
9095

91-
exeCtx := utils.NewCancelableExecutionCtx()
96+
eg := newExecGroup()
9297
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
93-
configManager.Start(exeCtx)
98+
eg.Go(configManager.Start)
9499
mockRequester.AssertExpectations(t)
95100

96101
actual, err := configManager.GetConfig()
@@ -116,9 +121,9 @@ func TestNewPollingProjectConfigManagerWithLastModifiedDates(t *testing.T) {
116121

117122
sdkKey := "test_sdk_key"
118123

119-
exeCtx := utils.NewCancelableExecutionCtx()
124+
eg := newExecGroup()
120125
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
121-
configManager.Start(exeCtx)
126+
eg.Go(configManager.Start)
122127

123128
// Fetch valid config
124129
actual, err := configManager.GetConfig()
@@ -146,9 +151,9 @@ func TestNewPollingProjectConfigManagerWithDifferentDatafileRevisions(t *testing
146151
// Test we fetch using requester
147152
sdkKey := "test_sdk_key"
148153

149-
exeCtx := utils.NewCancelableExecutionCtx()
154+
eg := newExecGroup()
150155
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
151-
configManager.Start(exeCtx)
156+
eg.Go(configManager.Start)
152157
mockRequester.AssertExpectations(t)
153158

154159
actual, err := configManager.GetConfig()
@@ -173,9 +178,9 @@ func TestNewPollingProjectConfigManagerWithErrorHandling(t *testing.T) {
173178
// Test we fetch using requester
174179
sdkKey := "test_sdk_key"
175180

176-
exeCtx := utils.NewCancelableExecutionCtx()
181+
eg := newExecGroup()
177182
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
178-
configManager.Start(exeCtx)
183+
eg.Go(configManager.Start)
179184
mockRequester.AssertExpectations(t)
180185

181186
actual, err := configManager.GetConfig() // polling for bad file
@@ -204,9 +209,9 @@ func TestNewPollingProjectConfigManagerOnDecision(t *testing.T) {
204209
// Test we fetch using requester
205210
sdkKey := "test_sdk_key"
206211

207-
exeCtx := utils.NewCancelableExecutionCtx()
212+
eg := newExecGroup()
208213
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
209-
configManager.Start(exeCtx)
214+
eg.Go(configManager.Start)
210215

211216
var numberOfCalls = 0
212217
callback := func(notification notification.ProjectConfigUpdateNotification) {
@@ -238,19 +243,19 @@ func TestPollingInterval(t *testing.T) {
238243

239244
sdkKey := "test_sdk_key"
240245

241-
exeCtx := utils.NewCancelableExecutionCtx()
246+
eg := newExecGroup()
242247
configManager := NewPollingProjectConfigManager(sdkKey, WithPollingInterval(5*time.Second))
243-
configManager.Start(exeCtx)
248+
eg.Go(configManager.Start)
244249

245250
assert.Equal(t, configManager.pollingInterval, 5*time.Second)
246251
}
247252

248253
func TestInitialDatafile(t *testing.T) {
249254

250255
sdkKey := "test_sdk_key"
251-
exeCtx := utils.NewCancelableExecutionCtx()
256+
eg := newExecGroup()
252257
configManager := NewPollingProjectConfigManager(sdkKey, WithInitialDatafile([]byte("test")))
253-
configManager.Start(exeCtx)
258+
eg.Go(configManager.Start)
254259

255260
assert.Equal(t, configManager.initDatafile, []byte("test"))
256261
}

pkg/event/dispatcher.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package event
1919

2020
import (
21-
"context"
2221
"fmt"
2322
"net/http"
2423
"sync"
@@ -166,13 +165,10 @@ func (ed *QueueEventDispatcher) flushEvents() {
166165
}
167166

168167
// NewQueueEventDispatcher creates a Dispatcher that queues in memory and then sends via go routine.
169-
func NewQueueEventDispatcher(ctx context.Context) Dispatcher {
170-
dispatcher := &QueueEventDispatcher{eventQueue: NewInMemoryQueue(defaultQueueSize), Dispatcher: &HTTPEventDispatcher{requester: utils.NewHTTPRequester()}, metrics: &DefaultMetrics{}}
171-
172-
go func() {
173-
<-ctx.Done()
174-
dispatcher.flushEvents()
175-
}()
176-
177-
return dispatcher
168+
func NewQueueEventDispatcher() *QueueEventDispatcher {
169+
return &QueueEventDispatcher{
170+
eventQueue: NewInMemoryQueue(defaultQueueSize),
171+
Dispatcher: &HTTPEventDispatcher{requester: utils.NewHTTPRequester()},
172+
metrics: &DefaultMetrics{},
173+
}
178174
}

0 commit comments

Comments
 (0)