Skip to content

Commit a96c307

Browse files
authored
refact(odp): Removes support for odp batchSize and fixes ticker. (#356)
* Removing support for altering odp batchSize
1 parent a4246e6 commit a96c307

File tree

5 files changed

+166
-123
lines changed

5 files changed

+166
-123
lines changed

pkg/client/factory.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/****************************************************************************
2-
* Copyright 2019-2020,2022 Optimizely, Inc. and contributors *
2+
* Copyright 2019-2020,2022-2023 Optimizely, Inc. and contributors *
33
* *
44
* Licensed under the Apache License, Version 2.0 (the "License"); *
55
* you may not use this file except in compliance with the License. *
@@ -331,10 +331,8 @@ func (f *OptimizelyFactory) startOdpManager(eg *utils.ExecGroup, appClient *Opti
331331
}
332332

333333
// Start odp ticker
334-
apiKey := odpManager.OdpConfig.GetAPIKey()
335-
apiHost := odpManager.OdpConfig.GetAPIHost()
336334
eg.Go(func(ctx context.Context) {
337-
odpManager.EventManager.Start(ctx, apiKey, apiHost)
335+
odpManager.EventManager.Start(ctx, odpManager.OdpConfig)
338336
})
339337

340338
// Only check for changes if ConfigManager is non static
@@ -351,7 +349,7 @@ func (f *OptimizelyFactory) startOdpManager(eg *utils.ExecGroup, appClient *Opti
351349
segmentList := conf.GetSegmentList()
352350
eg.Go(func(ctx context.Context) {
353351
odpManager.Update(apiKey, apiHost, segmentList)
354-
odpManager.EventManager.Start(ctx, apiKey, apiHost)
352+
odpManager.EventManager.Start(ctx, odpManager.OdpConfig)
355353
})
356354
}
357355
}

pkg/client/optimizely_user_context_odp_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/****************************************************************************
2-
* Copyright 2022, Optimizely, Inc. and contributors *
2+
* Copyright 2022-2023, Optimizely, Inc. and contributors *
33
* *
44
* Licensed under the Apache License, Version 2.0 (the "License"); *
55
* you may not use this file except in compliance with the License. *
@@ -211,7 +211,7 @@ func (o *OptimizelyUserContextODPTestSuite) TestFetchQualifiedSegmentsParameters
211211

212212
func (o *OptimizelyUserContextODPTestSuite) TestOdpEventsEarlyEventsDispatched() {
213213
eventAPIManager := &MockEventAPIManager{}
214-
eventManager := event.NewBatchEventManager(event.WithAPIManager(eventAPIManager), event.WithBatchSize(1))
214+
eventManager := event.NewBatchEventManager(event.WithAPIManager(eventAPIManager), event.WithFlushInterval(0))
215215
odpManager := odp.NewOdpManager("", false, odp.WithEventManager(eventManager))
216216
factory := OptimizelyFactory{Datafile: o.datafile, odpManager: odpManager}
217217
optimizelyClient, _ := factory.Client()
@@ -239,9 +239,9 @@ func (o *OptimizelyUserContextODPTestSuite) TestOdpEventsEarlyEventsDispatched()
239239
// userContext := optimizelyClient.CreateUserContext(o.userID, nil)
240240
// var wg sync.WaitGroup
241241
// wg.Add(1)
242-
// userContext.FetchQualifiedSegments(nil, func(segments []string, err error) {
243-
// o.NoError(err)
244-
// o.Equal([]string{}, segments)
242+
// userContext.FetchQualifiedSegmentsAsync(nil, func(success bool) {
243+
// o.True(success)
244+
// o.Equal([]string{}, userContext.GetQualifiedSegments())
245245
// wg.Done()
246246
// })
247247
// wg.Wait()

pkg/odp/event/event_manager.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/****************************************************************************
2-
* Copyright 2022, Optimizely, Inc. and contributors *
2+
* Copyright 2022-2023, Optimizely, Inc. and contributors *
33
* *
44
* Licensed under the Apache License, Version 2.0 (the "License"); *
55
* you may not use this file except in compliance with the License. *
@@ -27,6 +27,7 @@ import (
2727
guuid "github.com/google/uuid"
2828
"github.com/optimizely/go-sdk/pkg/event"
2929
"github.com/optimizely/go-sdk/pkg/logging"
30+
"github.com/optimizely/go-sdk/pkg/odp/config"
3031
"github.com/optimizely/go-sdk/pkg/odp/utils"
3132
"golang.org/x/sync/semaphore"
3233
)
@@ -39,7 +40,8 @@ const maxRetries = 3
3940

4041
// Manager represents the event manager.
4142
type Manager interface {
42-
Start(ctx context.Context, apiKey, apiHost string)
43+
// odpConfig is required here since it can be updated anytime and ticker needs to be aware of latest changes
44+
Start(ctx context.Context, odpConfig config.Config)
4345
IdentifyUser(apiKey, apiHost, userID string)
4446
ProcessEvent(apiKey, apiHost string, odpEvent Event) bool
4547
FlushEvents(apiKey, apiHost string)
@@ -59,16 +61,6 @@ type BatchEventManager struct {
5961
logger logging.OptimizelyLogProducer
6062
}
6163

62-
// WithBatchSize sets the batch size as a config option to be passed into the NewBatchEventManager method
63-
// default value is 10
64-
func WithBatchSize(bsize int) EMOptionFunc {
65-
return func(bm *BatchEventManager) {
66-
if bsize > 0 {
67-
bm.batchSize = bsize
68-
}
69-
}
70-
}
71-
7264
// WithQueueSize sets the queue size as a config option to be passed into the NewBatchEventManager method
7365
// default value is 10000
7466
func WithQueueSize(qsize int) EMOptionFunc {
@@ -83,7 +75,11 @@ func WithQueueSize(qsize int) EMOptionFunc {
8375
// default value is 1 second
8476
func WithFlushInterval(flushInterval time.Duration) EMOptionFunc {
8577
return func(bm *BatchEventManager) {
86-
if flushInterval > 0 {
78+
if flushInterval >= 0 {
79+
// if flush interval is zero, send events immediately by setting batchSize to 1
80+
if flushInterval == 0 {
81+
bm.batchSize = 1
82+
}
8783
bm.flushInterval = flushInterval
8884
}
8985
}
@@ -147,11 +143,12 @@ func NewBatchEventManager(options ...EMOptionFunc) *BatchEventManager {
147143
}
148144

149145
// Start does not do any initialization, just starts the ticker
150-
func (bm *BatchEventManager) Start(ctx context.Context, apiKey, apiHost string) {
151-
if !bm.IsOdpServiceIntegrated(apiKey, apiHost) {
146+
// odpConfig is required here since it can be updated anytime and ticker needs to be aware of latest changes
147+
func (bm *BatchEventManager) Start(ctx context.Context, odpConfig config.Config) {
148+
if !bm.IsOdpServiceIntegrated(odpConfig.GetAPIKey(), odpConfig.GetAPIHost()) {
152149
return
153150
}
154-
bm.startTicker(ctx, apiKey, apiHost)
151+
bm.startTicker(ctx, odpConfig)
155152
}
156153

157154
// IdentifyUser associates a full-stack userid with an established VUID
@@ -209,7 +206,11 @@ func (bm *BatchEventManager) ProcessEvent(apiKey, apiHost string, odpEvent Event
209206
}
210207

211208
// StartTicker starts new ticker for flushing events
212-
func (bm *BatchEventManager) startTicker(ctx context.Context, apiKey, apiHost string) {
209+
func (bm *BatchEventManager) startTicker(ctx context.Context, odpConfig config.Config) {
210+
// Do not start ticker if flushInterval is 0
211+
if bm.flushInterval <= 0 {
212+
return
213+
}
213214
// Make sure multiple go-routines dont reinitialize ticker
214215
bm.flushLock.Lock()
215216
if bm.ticker != nil {
@@ -224,10 +225,10 @@ func (bm *BatchEventManager) startTicker(ctx context.Context, apiKey, apiHost st
224225
for {
225226
select {
226227
case <-bm.ticker.C:
227-
bm.FlushEvents(apiKey, apiHost)
228+
bm.FlushEvents(odpConfig.GetAPIKey(), odpConfig.GetAPIHost())
228229
case <-ctx.Done():
229230
bm.logger.Debug("BatchEventManager stopped, flushing events.")
230-
bm.FlushEvents(apiKey, apiHost)
231+
bm.FlushEvents(odpConfig.GetAPIKey(), odpConfig.GetAPIHost())
231232
return
232233
}
233234
}

0 commit comments

Comments
 (0)