Skip to content

Commit ae6d34e

Browse files
(feat): Queue event dispatcher (#88)
* first pass at event queued event dispatcher * fix typo * add unit test and fix dispatch * fix removing bad event * cleanup from Mike Ng's comments * add context * update dispatcher to use context correctly * pass in context * update lint fixes * fix lint issues * fix lint issues. * remove sleep * add some logging * cleanup and add header * merge in fm-beta. remove dispatch callback * update license * add headers after merge
1 parent c67e6c7 commit ae6d34e

File tree

6 files changed

+200
-5
lines changed

6 files changed

+200
-5
lines changed

optimizely/event/dispatcher.go

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,22 @@ package event
1919

2020
import (
2121
"bytes"
22+
"context"
2223
"encoding/json"
2324
"fmt"
2425
"net/http"
26+
"sync"
27+
"time"
2528

2629
"github.com/optimizely/go-sdk/optimizely/logging"
2730
)
2831

2932
const jsonContentType = "application/json"
33+
const maxRetries = 3
34+
const defaultQueueSize = 1000
35+
const sleepTime = 5 * time.Second
36+
37+
var dispatcherLogger = logging.GetLogger("EventDispatcher")
3038

3139
// Dispatcher dispatches events
3240
type Dispatcher interface {
@@ -37,8 +45,6 @@ type Dispatcher interface {
3745
type HTTPEventDispatcher struct {
3846
}
3947

40-
var dispatcherLogger = logging.GetLogger("EventDispatcher")
41-
4248
// DispatchEvent dispatches event with callback
4349
func (*HTTPEventDispatcher) DispatchEvent(event LogEvent) (bool, error) {
4450

@@ -54,9 +60,88 @@ func (*HTTPEventDispatcher) DispatchEvent(event LogEvent) (bool, error) {
5460
if resp.StatusCode == 204 {
5561
success = true
5662
} else {
57-
dispatcherLogger.Error(fmt.Sprintf("http.Post invalid response %d", resp.StatusCode), err)
63+
dispatcherLogger.Error(fmt.Sprintf("http.Post invalid response %d", resp.StatusCode), nil)
5864
success = false
5965
}
6066
}
6167
return success, err
6268
}
69+
70+
// QueueEventDispatcher is a queued version of the event dispatcher that queues, returns success, and dispatches events in the background
71+
type QueueEventDispatcher struct {
72+
eventQueue Queue
73+
eventFlushLock sync.Mutex
74+
dispatcher *HTTPEventDispatcher
75+
}
76+
77+
// DispatchEvent queues event with callback and calls flush in a go routine.
78+
func (ed *QueueEventDispatcher) DispatchEvent(event LogEvent) (bool, error) {
79+
ed.eventQueue.Add(event)
80+
go func() {
81+
ed.flushEvents()
82+
}()
83+
return true,nil
84+
}
85+
86+
// flush the events
87+
func (ed *QueueEventDispatcher) flushEvents() {
88+
89+
ed.eventFlushLock.Lock()
90+
91+
defer func(){
92+
ed.eventFlushLock.Unlock()
93+
}()
94+
95+
retryCount := 0
96+
97+
for ed.eventQueue.Size() > 0 {
98+
if retryCount > maxRetries {
99+
dispatcherLogger.Error(fmt.Sprintf("event failed to send %d times. It will retry on next event sent", maxRetries), nil)
100+
break
101+
}
102+
103+
items := ed.eventQueue.Get(1)
104+
if len(items) == 0 {
105+
// something happened. Just continue and you should expect size to be zero.
106+
continue
107+
}
108+
event, ok := items[0].(LogEvent)
109+
if !ok {
110+
// remove it
111+
dispatcherLogger.Error("invalid type passed to event dispatcher", nil)
112+
ed.eventQueue.Remove(1)
113+
continue
114+
}
115+
116+
success, err := ed.dispatcher.DispatchEvent(event)
117+
118+
if err == nil {
119+
if success {
120+
dispatcherLogger.Debug(fmt.Sprintf("Dispatched log event %+v", event))
121+
ed.eventQueue.Remove(1)
122+
retryCount = 0
123+
} else {
124+
dispatcherLogger.Warning("dispatch event failed")
125+
// we failed. Sleep some seconds and try again.
126+
time.Sleep(sleepTime)
127+
// increase retryCount. We exit if we have retried x times.
128+
// we will retry again next event that is added.
129+
retryCount++
130+
}
131+
} else {
132+
dispatcherLogger.Error("Error dispatching ", err)
133+
}
134+
}
135+
}
136+
137+
// NewQueueEventDispatcher creates a dispatcher that queues in memory and then sends via go routine.
138+
func NewQueueEventDispatcher(ctx context.Context) Dispatcher {
139+
dispatcher := &QueueEventDispatcher{eventQueue: NewInMemoryQueue(defaultQueueSize), dispatcher:&HTTPEventDispatcher{}}
140+
141+
go func() {
142+
<-ctx.Done()
143+
dispatcher.flushEvents()
144+
}()
145+
146+
return dispatcher
147+
}

optimizely/event/dispatcher_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/****************************************************************************
2+
* Copyright 2019, Optimizely, Inc. and contributors *
3+
* *
4+
* Licensed under the Apache License, Version 2.0 (the "License"); *
5+
* you may not use this file except in compliance with the License. *
6+
* You may obtain a copy of the License at *
7+
* *
8+
* http://www.apache.org/licenses/LICENSE-2.0 *
9+
* *
10+
* Unless required by applicable law or agreed to in writing, software *
11+
* distributed under the License is distributed on an "AS IS" BASIS, *
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
13+
* See the License for the specific language governing permissions and *
14+
* limitations under the License. *
15+
***************************************************************************/
16+
17+
// Package event //
18+
package event
19+
20+
import (
21+
"context"
22+
"github.com/optimizely/go-sdk/optimizely/entities"
23+
"github.com/stretchr/testify/assert"
24+
"testing"
25+
"time"
26+
)
27+
28+
func TestQueueEventDispatcher_DispatchEvent(t *testing.T) {
29+
ctx := context.TODO()
30+
q := NewQueueEventDispatcher(ctx)
31+
32+
eventTags := map[string]interface{}{"revenue": 55.0, "value": 25.1}
33+
config := TestConfig{}
34+
35+
conversionUserEvent := CreateConversionUserEvent(config, entities.Event{ExperimentIds: []string{"15402980349"}, ID: "15368860886", Key: "sample_conversion"}, userContext, eventTags)
36+
37+
batch := createBatchEvent(conversionUserEvent, createVisitorFromUserEvent(conversionUserEvent))
38+
39+
logEvent := createLogEvent(batch)
40+
41+
qd, _ := q.(*QueueEventDispatcher)
42+
43+
success, _ := qd.DispatchEvent(logEvent)
44+
45+
assert.True(t, success)
46+
47+
// its been queued
48+
assert.Equal(t,1, qd.eventQueue.Size())
49+
50+
// give the queue a chance to run
51+
time.Sleep(1 * time.Second)
52+
53+
// check the queue
54+
assert.Equal(t,0, qd.eventQueue.Size())
55+
56+
}

optimizely/event/factory_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/****************************************************************************
2+
* Copyright 2019, Optimizely, Inc. and contributors *
3+
* *
4+
* Licensed under the Apache License, Version 2.0 (the "License"); *
5+
* you may not use this file except in compliance with the License. *
6+
* You may obtain a copy of the License at *
7+
* *
8+
* http://www.apache.org/licenses/LICENSE-2.0 *
9+
* *
10+
* Unless required by applicable law or agreed to in writing, software *
11+
* distributed under the License is distributed on an "AS IS" BASIS, *
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
13+
* See the License for the specific language governing permissions and *
14+
* limitations under the License. *
15+
***************************************************************************/
16+
17+
// Package event //
118
package event
219

320
import (

optimizely/event/processor.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ func NewEventProcessor(exeCtx utils.ExecutionCtx, batchSize, queueSize int, flus
6262
MaxQueueSize: queueSize,
6363
FlushInterval: flushInterval,
6464
Q: NewInMemoryQueue(queueSize),
65-
EventDispatcher: &HTTPEventDispatcher{},
66-
65+
EventDispatcher: NewQueueEventDispatcher(exeCtx.GetContext()),
6766
wg: exeCtx.GetWaitSync(),
6867
}
6968
p.BatchSize = DefaultBatchSize
@@ -118,6 +117,10 @@ func (p *QueueingEventProcessor) StartTicker(ctx context.Context) {
118117
case <-ctx.Done():
119118
pLogger.Debug("Event processor stopped, flushing events.")
120119
p.FlushEvents()
120+
d, ok := p.EventDispatcher.(*QueueEventDispatcher)
121+
if ok {
122+
d.flushEvents()
123+
}
121124
return
122125
}
123126
}

optimizely/event/processor_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/****************************************************************************
2+
* Copyright 2019, Optimizely, Inc. and contributors *
3+
* *
4+
* Licensed under the Apache License, Version 2.0 (the "License"); *
5+
* you may not use this file except in compliance with the License. *
6+
* You may obtain a copy of the License at *
7+
* *
8+
* http://www.apache.org/licenses/LICENSE-2.0 *
9+
* *
10+
* Unless required by applicable law or agreed to in writing, software *
11+
* distributed under the License is distributed on an "AS IS" BASIS, *
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
13+
* See the License for the specific language governing permissions and *
14+
* limitations under the License. *
15+
***************************************************************************/
16+
17+
// Package event //
118
package event
219

320
import (

optimizely/event/queue_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/****************************************************************************
2+
* Copyright 2019, Optimizely, Inc. and contributors *
3+
* *
4+
* Licensed under the Apache License, Version 2.0 (the "License"); *
5+
* you may not use this file except in compliance with the License. *
6+
* You may obtain a copy of the License at *
7+
* *
8+
* http://www.apache.org/licenses/LICENSE-2.0 *
9+
* *
10+
* Unless required by applicable law or agreed to in writing, software *
11+
* distributed under the License is distributed on an "AS IS" BASIS, *
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
13+
* See the License for the specific language governing permissions and *
14+
* limitations under the License. *
15+
***************************************************************************/
16+
17+
// Package event //
118
package event
219

320
import (

0 commit comments

Comments
 (0)