Skip to content

Commit 2e7c275

Browse files
author
Mike Davis
authored
Fix unbounded memory allocation. (#275)
* Add bounded capacity to InMemoryQueue directly * Fix concurrency issues in InMemoryQueue when checking queue size * Fix QueueEventDispatcher size gauge metric * Fix leaking goroutines in QueueEventDispatcher
1 parent 98f7650 commit 2e7c275

File tree

6 files changed

+111
-96
lines changed

6 files changed

+111
-96
lines changed

go.sum

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,35 @@
1-
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
2-
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
3-
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
4-
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
5-
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
6-
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
71
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
82
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
93
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
10-
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
4+
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
115
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
126
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
137
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
148
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
159
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
1610
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
1711
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
18-
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
19-
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
20-
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
2112
github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo=
2213
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
23-
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
24-
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
25-
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
2614
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
2715
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
2816
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
2917
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
30-
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
3118
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
3219
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
3320
github.com/pkg/profile v1.3.0 h1:OQIvuDgm00gWVWGTf4m4mCt6W1/0YqU7Ntg0mySWgaI=
3421
github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
3522
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3623
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
37-
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
38-
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
39-
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
40-
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
41-
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
42-
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
43-
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
4424
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
4525
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
46-
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
4726
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
4827
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
4928
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
5029
github.com/twmb/murmur3 v1.0.0 h1:MLMwMEQRKsu94uJnoveYjjHmcLwI3HNcWXP4LJuNe3I=
5130
github.com/twmb/murmur3 v1.0.0/go.mod h1:5Y5m8Y8WIyucaICVP+Aep5C8ydggjEuRQHDq1icoOYo=
52-
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
53-
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
54-
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
5531
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
5632
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
57-
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
58-
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
5933
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
6034
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
6135
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=

pkg/event/dispatcher.go

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@ package event
2020
import (
2121
"fmt"
2222
"net/http"
23-
"sync"
2423
"time"
2524

25+
"golang.org/x/sync/semaphore"
26+
2627
"github.com/optimizely/go-sdk/pkg/logging"
2728
"github.com/optimizely/go-sdk/pkg/metrics"
2829
"github.com/optimizely/go-sdk/pkg/utils"
2930
)
3031

32+
const maxWorkers = int64(1)
3133
const maxRetries = 3
3234
const defaultQueueSize = 1000
3335
const sleepTime = 1 * time.Second
@@ -40,7 +42,7 @@ type Dispatcher interface {
4042
// httpEventDispatcher is the HTTP implementation of the Dispatcher interface
4143
type httpEventDispatcher struct {
4244
requester *utils.HTTPRequester
43-
logger logging.OptimizelyLogProducer
45+
logger logging.OptimizelyLogProducer
4446
}
4547

4648
// DispatchEvent dispatches event with callback
@@ -79,39 +81,38 @@ func NewHTTPEventDispatcher(sdkKey string, requester *utils.HTTPRequester, logge
7981

8082
// QueueEventDispatcher is a queued version of the event Dispatcher that queues, returns success, and dispatches events in the background
8183
type QueueEventDispatcher struct {
82-
eventQueue Queue
83-
eventFlushLock sync.Mutex
84-
Dispatcher Dispatcher
85-
logger logging.OptimizelyLogProducer
84+
eventQueue Queue
85+
processing *semaphore.Weighted
86+
Dispatcher Dispatcher
87+
logger logging.OptimizelyLogProducer
8688

8789
// metrics
88-
queueSize metrics.Gauge
89-
sucessFlush metrics.Counter
90-
failFlushCounter metrics.Counter
91-
retryFlushCounter metrics.Counter
90+
queueSizeGauge metrics.Gauge
91+
sucessFlushCounter metrics.Counter
92+
failFlushCounter metrics.Counter
93+
retryFlushCounter metrics.Counter
9294
}
9395

9496
// DispatchEvent queues event with callback and calls flush in a go routine.
9597
func (ed *QueueEventDispatcher) DispatchEvent(event LogEvent) (bool, error) {
9698
ed.eventQueue.Add(event)
97-
go func() {
98-
ed.flushEvents()
99-
}()
99+
go ed.flushEvents()
100100
return true, nil
101101
}
102102

103103
// flush the events
104104
func (ed *QueueEventDispatcher) flushEvents() {
105105

106-
ed.eventFlushLock.Lock()
107-
108-
defer func() {
109-
ed.eventFlushLock.Unlock()
110-
}()
106+
// Limit flushing to a single worker
107+
if !ed.processing.TryAcquire(1) {
108+
return
109+
}
110+
defer ed.processing.Release(1)
111111

112112
retryCount := 0
113-
ed.queueSize.Set(float64(ed.eventQueue.Size()))
114-
for ed.eventQueue.Size() > 0 {
113+
queueSize := ed.eventQueue.Size()
114+
for ; queueSize > 0; queueSize = ed.eventQueue.Size() {
115+
ed.queueSizeGauge.Set(float64(queueSize))
115116
if retryCount > maxRetries {
116117
ed.logger.Error(fmt.Sprintf("event failed to send %d times. It will retry on next event sent", maxRetries), nil)
117118
ed.failFlushCounter.Add(1)
@@ -136,10 +137,10 @@ func (ed *QueueEventDispatcher) flushEvents() {
136137

137138
if err == nil {
138139
if success {
139-
ed.logger.Debug(fmt.Sprintf("Dispatched log event %+v", event))
140+
ed.logger.Debug("dispatch log event succeeded")
140141
ed.eventQueue.Remove(1)
141142
retryCount = 0
142-
ed.sucessFlush.Add(1)
143+
ed.sucessFlushCounter.Add(1)
143144
} else {
144145
ed.logger.Warning("dispatch event failed")
145146
// we failed. Sleep some seconds and try again.
@@ -159,7 +160,7 @@ func (ed *QueueEventDispatcher) flushEvents() {
159160
ed.retryFlushCounter.Add(1)
160161
}
161162
}
162-
ed.queueSize.Set(float64(ed.eventQueue.Size()))
163+
ed.queueSizeGauge.Set(float64(queueSize))
163164
}
164165

165166
// NewQueueEventDispatcher creates a Dispatcher that queues in memory and then sends via go routine.
@@ -172,13 +173,15 @@ func NewQueueEventDispatcher(sdkKey string, metricsRegistry metrics.Registry) *Q
172173
dispatcherMetricsRegistry = metrics.NewNoopRegistry() // protective code to set
173174
}
174175

176+
logger := logging.GetLogger(sdkKey, "QueueEventDispatcher")
175177
return &QueueEventDispatcher{
176-
eventQueue: NewInMemoryQueue(defaultQueueSize),
177-
Dispatcher: NewHTTPEventDispatcher(sdkKey, nil, nil),
178-
queueSize: dispatcherMetricsRegistry.GetGauge(metrics.DispatcherQueueSize),
179-
retryFlushCounter: dispatcherMetricsRegistry.GetCounter(metrics.DispatcherRetryFlush),
180-
failFlushCounter: dispatcherMetricsRegistry.GetCounter(metrics.DispatcherFailedFlush),
181-
sucessFlush: dispatcherMetricsRegistry.GetCounter(metrics.DispatcherSuccessFlush),
182-
logger: logging.GetLogger(sdkKey, "QueueEventDispatcher"),
178+
eventQueue: NewInMemoryQueueWithLogger(defaultQueueSize, logger),
179+
Dispatcher: NewHTTPEventDispatcher(sdkKey, nil, nil),
180+
queueSizeGauge: dispatcherMetricsRegistry.GetGauge(metrics.DispatcherQueueSize),
181+
retryFlushCounter: dispatcherMetricsRegistry.GetCounter(metrics.DispatcherRetryFlush),
182+
failFlushCounter: dispatcherMetricsRegistry.GetCounter(metrics.DispatcherFailedFlush),
183+
sucessFlushCounter: dispatcherMetricsRegistry.GetCounter(metrics.DispatcherSuccessFlush),
184+
logger: logger,
185+
processing: semaphore.NewWeighted(maxWorkers),
183186
}
184187
}

pkg/event/processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func NewBatchEventProcessor(options ...BPOptionConfig) *BatchEventProcessor {
166166
}
167167

168168
if p.Q == nil {
169-
p.Q = NewInMemoryQueue(p.MaxQueueSize)
169+
p.Q = NewInMemoryQueueWithLogger(p.MaxQueueSize, p.logger)
170170
}
171171

172172
if p.EventDispatcher == nil {

pkg/event/processor_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24-
"github.com/optimizely/go-sdk/pkg/logging"
25-
"github.com/optimizely/go-sdk/pkg/utils"
26-
"github.com/stretchr/testify/assert"
2724
"math"
2825
"testing"
2926
"time"
27+
28+
"github.com/stretchr/testify/assert"
29+
30+
"github.com/optimizely/go-sdk/pkg/logging"
31+
"github.com/optimizely/go-sdk/pkg/utils"
3032
)
3133

3234
type CountingDispatcher struct {
@@ -538,7 +540,7 @@ func BenchmarkProcessor(b *testing.B) {
538540
}
539541

540542
for _, merge := range merges {
541-
for i := 1.; i <= 5; i++ {
543+
for i := 4.; i <= 7; i++ {
542544
qs := int(math.Pow(10, i))
543545
for j := 1; j <= 6; j++ {
544546
bs := 10 * j

pkg/event/queue.go

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,60 +19,85 @@ package event
1919

2020
import (
2121
"sync"
22+
23+
"github.com/optimizely/go-sdk/pkg/logging"
2224
)
2325

2426
// Queue represents a queue
2527
type Queue interface {
26-
Add(item interface{})
28+
Add(item interface{}) // TODO Should return a bool
2729
Remove(count int) []interface{}
2830
Get(count int) []interface{}
2931
Size() int
3032
}
3133

3234
// InMemoryQueue represents a in-memory queue
3335
type InMemoryQueue struct {
34-
Queue []interface{}
35-
Mux sync.Mutex
36+
logger logging.OptimizelyLogProducer
37+
MaxSize int
38+
Queue []interface{}
39+
Mux sync.Mutex
3640
}
3741

3842
// Get returns queue for given count size
39-
func (i *InMemoryQueue) Get(count int) []interface{} {
40-
if i.Size() < count {
41-
count = i.Size()
42-
}
43-
i.Mux.Lock()
44-
defer i.Mux.Unlock()
45-
return i.Queue[:count]
43+
func (q *InMemoryQueue) Get(count int) []interface{} {
44+
q.Mux.Lock()
45+
defer q.Mux.Unlock()
46+
47+
count = q.getSafeCount(count)
48+
return q.Queue[:count]
4649
}
4750

4851
// Add appends item to queue
49-
func (i *InMemoryQueue) Add(item interface{}) {
50-
i.Mux.Lock()
51-
i.Queue = append(i.Queue, item)
52-
i.Mux.Unlock()
52+
func (q *InMemoryQueue) Add(item interface{}) {
53+
q.Mux.Lock()
54+
defer q.Mux.Unlock()
55+
56+
if len(q.Queue) >= q.MaxSize {
57+
q.logger.Warning("MaxQueueSize has been met. Discarding event")
58+
return
59+
}
60+
61+
q.Queue = append(q.Queue, item)
5362
}
5463

5564
// Remove removes item from queue and returns elements slice
56-
func (i *InMemoryQueue) Remove(count int) []interface{} {
57-
if i.Size() < count {
58-
count = i.Size()
59-
}
60-
i.Mux.Lock()
61-
defer i.Mux.Unlock()
62-
elem := i.Queue[:count]
63-
i.Queue = i.Queue[count:]
65+
func (q *InMemoryQueue) Remove(count int) []interface{} {
66+
q.Mux.Lock()
67+
defer q.Mux.Unlock()
68+
69+
count = q.getSafeCount(count)
70+
elem := q.Queue[:count]
71+
q.Queue = q.Queue[count:]
6472
return elem
6573
}
6674

75+
func (q *InMemoryQueue) getSafeCount(count int) int {
76+
if size := len(q.Queue); size < count {
77+
return size
78+
}
79+
80+
return count
81+
}
82+
6783
// Size returns size of queue
68-
func (i *InMemoryQueue) Size() int {
69-
i.Mux.Lock()
70-
defer i.Mux.Unlock()
71-
return len(i.Queue)
84+
func (q *InMemoryQueue) Size() int {
85+
q.Mux.Lock()
86+
defer q.Mux.Unlock()
87+
return len(q.Queue)
7288
}
7389

7490
// NewInMemoryQueue returns new InMemoryQueue with given queueSize
7591
func NewInMemoryQueue(queueSize int) Queue {
76-
i := &InMemoryQueue{Queue: make([]interface{}, 0, queueSize)}
77-
return i
92+
logger := logging.GetLogger("", "InMemoryQueue")
93+
return NewInMemoryQueueWithLogger(queueSize, logger)
94+
}
95+
96+
// NewInMemoryQueueWithLogger returns new InMemoryQueue with given queueSize and logger
97+
func NewInMemoryQueueWithLogger(queueSize int, logger logging.OptimizelyLogProducer) Queue {
98+
return &InMemoryQueue{
99+
logger: logger,
100+
MaxSize: queueSize,
101+
Queue: make([]interface{}, 0, queueSize),
102+
}
78103
}

0 commit comments

Comments
 (0)