Skip to content

Commit 3bb1521

Browse files
added blocking call to flush dispatcher
1 parent 52f864a commit 3bb1521

File tree

4 files changed

+19
-32
lines changed

4 files changed

+19
-32
lines changed

examples/main.go

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@
44
package main
55

66
import (
7-
"context"
87
"fmt"
9-
"time"
10-
118
"github.com/optimizely/go-sdk/optimizely/client"
129
"github.com/optimizely/go-sdk/optimizely/entities"
1310
"github.com/optimizely/go-sdk/optimizely/logging"
@@ -40,40 +37,15 @@ func main() {
4037
fmt.Printf("Is feature enabled? %v\n", enabled)
4138

4239
fmt.Println()
43-
44-
/************* ClientWithOptions - custom context ********************/
45-
46-
optimizelyFactory = &client.OptimizelyFactory{
47-
SDKKey: "4SLpaJA1r1pgE6T2CoMs9q",
48-
}
49-
ctx := context.Background()
50-
ctx, cancelManager := context.WithCancel(ctx) // user can set up any context
51-
clientOptions := client.Options{
52-
Context: ctx,
53-
}
54-
55-
app, err = optimizelyFactory.ClientWithOptions(clientOptions)
56-
cancelManager() // user can cancel anytime
57-
58-
if err != nil {
59-
fmt.Printf("Error instantiating client: %s", err)
60-
return
61-
}
62-
63-
enabled, _ = app.IsFeatureEnabled("mutext_feat", user)
64-
fmt.Printf("Is feature enabled? %v\n", enabled)
65-
66-
time.Sleep(1000 * time.Millisecond)
40+
app.Close() // user can close dispatcher
6741
fmt.Println()
68-
6942
/************* Client ********************/
7043

7144
optimizelyFactory = &client.OptimizelyFactory{
7245
SDKKey: "4SLpaJA1r1pgE6T2CoMs9q",
7346
}
7447

7548
app, err = optimizelyFactory.Client()
76-
app.Close() // user can cancel anytime
7749

7850
if err != nil {
7951
fmt.Printf("Error instantiating client: %s", err)
@@ -82,5 +54,5 @@ func main() {
8254

8355
enabled, _ = app.IsFeatureEnabled("mutext_feat", user)
8456
fmt.Printf("Is feature enabled? %v\n", enabled)
85-
57+
app.Close() // user can close dispatcher
8658
}

optimizely/client/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"fmt"
2424
"runtime/debug"
2525
"strconv"
26+
"sync"
2627

2728
"github.com/optimizely/go-sdk/optimizely/event"
2829

@@ -42,6 +43,7 @@ type OptimizelyClient struct {
4243
isValid bool
4344

4445
cancelFunc context.CancelFunc
46+
wg *sync.WaitGroup
4547
}
4648

4749
// IsFeatureEnabled returns true if the feature is enabled for the given user
@@ -280,4 +282,5 @@ func (o *OptimizelyClient) GetProjectConfig() (projectConfig optimizely.ProjectC
280282
// Close closes the Optimizely instance and stops any ongoing tasks from its children components
281283
func (o *OptimizelyClient) Close() {
282284
o.cancelFunc()
285+
o.wg.Wait() //blocking call
283286
}

optimizely/client/factory.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package client
2020
import (
2121
"context"
2222
"errors"
23+
"sync"
2324

2425
"github.com/optimizely/go-sdk/optimizely/event"
2526

@@ -76,8 +77,12 @@ func (f OptimizelyFactory) StaticClient() (*OptimizelyClient, error) {
7677

7778
// ClientWithOptions returns a client initialized with the given configuration options
7879
func (f OptimizelyFactory) ClientWithOptions(clientOptions Options) (*OptimizelyClient, error) {
80+
81+
var wg sync.WaitGroup
82+
7983
client := &OptimizelyClient{
8084
isValid: false,
85+
wg: &wg,
8186
}
8287

8388
var ctx context.Context
@@ -117,7 +122,7 @@ func (f OptimizelyFactory) ClientWithOptions(clientOptions Options) (*Optimizely
117122
if clientOptions.EventProcessor != nil {
118123
client.eventProcessor = clientOptions.EventProcessor
119124
} else {
120-
client.eventProcessor = event.NewEventProcessor(ctx, event.DefaultBatchSize, event.DefaultEventQueueSize, event.DefaultEventFlushInterval)
125+
client.eventProcessor = event.NewEventProcessor(ctx, event.DefaultBatchSize, event.DefaultEventQueueSize, event.DefaultEventFlushInterval, &wg)
121126
}
122127

123128
client.isValid = true

optimizely/event/processor.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type QueueingEventProcessor struct {
4040
Mux sync.Mutex
4141
Ticker *time.Ticker
4242
EventDispatcher Dispatcher
43+
44+
wg *sync.WaitGroup
4345
}
4446

4547
// DefaultBatchSize holds the default value for the batch size
@@ -54,18 +56,21 @@ const DefaultEventFlushInterval = 30 * time.Second
5456
var pLogger = logging.GetLogger("EventProcessor")
5557

5658
// NewEventProcessor returns a new instance of QueueingEventProcessor with queueSize and flushInterval
57-
func NewEventProcessor(ctx context.Context, batchSize, queueSize int, flushInterval time.Duration) *QueueingEventProcessor {
59+
func NewEventProcessor(ctx context.Context, batchSize, queueSize int, flushInterval time.Duration, wg *sync.WaitGroup) *QueueingEventProcessor {
5860
p := &QueueingEventProcessor{
5961
MaxQueueSize: queueSize,
6062
FlushInterval: flushInterval,
6163
Q: NewInMemoryQueue(queueSize),
6264
EventDispatcher: &HTTPEventDispatcher{},
65+
66+
wg: wg,
6367
}
6468
p.BatchSize = DefaultBatchSize
6569
if batchSize > 0 {
6670
p.BatchSize = batchSize
6771
}
6872

73+
p.wg.Add(1)
6974
p.StartTicker(ctx)
7075
return p
7176
}
@@ -103,6 +108,8 @@ func (p *QueueingEventProcessor) StartTicker(ctx context.Context) {
103108
}
104109
p.Ticker = time.NewTicker(p.FlushInterval * time.Millisecond)
105110
go func() {
111+
112+
defer p.wg.Done()
106113
for {
107114
select {
108115
case <-p.Ticker.C:

0 commit comments

Comments
 (0)