Skip to content

Commit 9ee240c

Browse files
(feat): add channel queue (#90)
* add channel queues to fm-beta * update for lint issues * add processor tests for ChanQueue * update license header * fix typo in processor_test
1 parent 23f6de1 commit 9ee240c

File tree

3 files changed

+168
-0
lines changed

3 files changed

+168
-0
lines changed

optimizely/event/chan_queue.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/****************************************************************************
2+
* Copyright 2019, Optimizely, Inc. and contributors *
3+
* *
4+
* Licensed under the Apache License, Version 2.0 (the "License"); *
5+
* you may not use this file except in compliance with the License. *
6+
* You may obtain a copy of the License at *
7+
* *
8+
* http://www.apache.org/licenses/LICENSE-2.0 *
9+
* *
10+
* Unless required by applicable law or agreed to in writing, software *
11+
* distributed under the License is distributed on an "AS IS" BASIS, *
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
13+
* See the License for the specific language governing permissions and *
14+
* limitations under the License. *
15+
***************************************************************************/
16+
17+
// Package event //
18+
package event
19+
20+
// ChanQueue is a go channel based queue that takes things from the channel and puts them in a in memory queue
21+
type ChanQueue struct {
22+
ch chan interface{}
23+
messages Queue
24+
}
25+
26+
// Get returns queue for given count size
27+
func (i *ChanQueue) Get(count int) []interface{} {
28+
return i.messages.Get(count)
29+
}
30+
31+
// Add appends item to queue
32+
func (i *ChanQueue) Add(item interface{}) {
33+
i.ch <- item
34+
}
35+
36+
// Remove removes item from queue and returns elements slice
37+
func (i *ChanQueue) Remove(count int) []interface{} {
38+
return i.messages.Remove(count)
39+
40+
}
41+
42+
// Size returns size of queue
43+
func (i *ChanQueue) Size() int {
44+
return i.messages.Size()
45+
}
46+
47+
// NewChanQueue returns new go channel based queue with given in memory queueSize
48+
func NewChanQueue(queueSize int) Queue {
49+
50+
ch := make(chan interface{})
51+
52+
i := &ChanQueue{ch:ch, messages: NewInMemoryQueue(queueSize)}
53+
54+
go func() {
55+
for item := range i.ch {
56+
i.messages.Add(item)
57+
}
58+
}()
59+
60+
return i
61+
}
62+

optimizely/event/chan_queue_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/****************************************************************************
2+
* Copyright 2019, Optimizely, Inc. and contributors *
3+
* *
4+
* Licensed under the Apache License, Version 2.0 (the "License"); *
5+
* you may not use this file except in compliance with the License. *
6+
* You may obtain a copy of the License at *
7+
* *
8+
* http://www.apache.org/licenses/LICENSE-2.0 *
9+
* *
10+
* Unless required by applicable law or agreed to in writing, software *
11+
* distributed under the License is distributed on an "AS IS" BASIS, *
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
13+
* See the License for the specific language governing permissions and *
14+
* limitations under the License. *
15+
***************************************************************************/
16+
17+
// Package event //
18+
package event
19+
20+
import (
21+
"github.com/stretchr/testify/assert"
22+
"testing"
23+
"time"
24+
)
25+
26+
func TestChanQueue_Add_Size_Remove(t *testing.T) {
27+
q := NewChanQueue(100)
28+
29+
impression := BuildTestImpressionEvent()
30+
conversion := BuildTestConversionEvent()
31+
32+
q.Add(impression)
33+
q.Add(impression)
34+
q.Add(conversion)
35+
36+
time.Sleep(2000 * time.Millisecond)
37+
38+
items1 := q.Get(2)
39+
40+
assert.Equal(t, 2, len(items1))
41+
42+
q.Remove(1)
43+
44+
items2 := q.Get(1)
45+
46+
assert.True(t, len(items2) != 0)
47+
48+
allItems := q.Remove(3)
49+
50+
assert.True(t,len(allItems) > 0)
51+
52+
assert.Equal(t, 0, q.Size())
53+
}

optimizely/event/processor_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,56 @@ func TestDefaultEventProcessor_ProcessBatchProjectMismatch(t *testing.T) {
172172
assert.Equal(t, 2, len(evs.Event.Visitors))
173173
}
174174
}
175+
176+
func TestChanQueueEventProcessor_ProcessImpression(t *testing.T) {
177+
processor:= &QueueingEventProcessor{
178+
MaxQueueSize: 100,
179+
FlushInterval: 100,
180+
Q: NewChanQueue(100),
181+
EventDispatcher: &HTTPEventDispatcher{},
182+
}
183+
processor.BatchSize = 10
184+
processor.StartTicker(context.TODO())
185+
186+
impression := BuildTestImpressionEvent()
187+
188+
processor.ProcessEvent(impression)
189+
processor.ProcessEvent(impression)
190+
processor.ProcessEvent(impression)
191+
192+
time.Sleep(3000 * time.Millisecond)
193+
194+
assert.NotNil(t, processor.Ticker)
195+
196+
assert.Equal(t, 0, processor.EventsCount())
197+
}
198+
199+
func TestChanQueueEventProcessor_ProcessBatch(t *testing.T) {
200+
processor := &QueueingEventProcessor{MaxQueueSize: 100, FlushInterval: 100, Q: NewChanQueue(100), EventDispatcher: &MockDispatcher{}}
201+
processor.BatchSize = 10
202+
processor.StartTicker(context.TODO())
203+
204+
impression := BuildTestImpressionEvent()
205+
conversion := BuildTestConversionEvent()
206+
207+
processor.ProcessEvent(impression)
208+
processor.ProcessEvent(impression)
209+
processor.ProcessEvent(conversion)
210+
processor.ProcessEvent(conversion)
211+
212+
time.Sleep(3000 * time.Millisecond)
213+
214+
assert.NotNil(t, processor.Ticker)
215+
216+
assert.Equal(t, 0, processor.EventsCount())
217+
218+
time.Sleep(3000 * time.Millisecond)
219+
220+
result, ok := (processor.EventDispatcher).(*MockDispatcher)
221+
222+
if ok {
223+
assert.Equal(t, 1, len(result.Events))
224+
evs := result.Events[0]
225+
assert.True(t, len(evs.Event.Visitors) >= 1)
226+
}
227+
}

0 commit comments

Comments
 (0)