@@ -9,8 +9,12 @@ import (
9
9
"github.com/stretchr/testify/assert"
10
10
)
11
11
12
+ func close (wg * sync.WaitGroup , cancelFn context.CancelFunc ) {
13
+ cancelFn ()
14
+ wg .Wait ()
15
+ }
12
16
func TestDefaultEventProcessor_ProcessImpression (t * testing.T ) {
13
- ctx := context .Background ()
17
+ ctx , cancelFn := context .WithCancel ( context . Background () )
14
18
var wg sync.WaitGroup
15
19
processor := NewEventProcessor (ctx , 10 , 100 , 100 , & wg )
16
20
@@ -20,7 +24,7 @@ func TestDefaultEventProcessor_ProcessImpression(t *testing.T) {
20
24
21
25
assert .Equal (t , 1 , processor .EventsCount ())
22
26
23
- time . Sleep ( 2000 * time . Millisecond )
27
+ close ( & wg , cancelFn )
24
28
25
29
assert .NotNil (t , processor .Ticker )
26
30
@@ -38,6 +42,7 @@ func (f *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) {
38
42
39
43
func TestDefaultEventProcessor_ProcessBatch (t * testing.T ) {
40
44
var wg sync.WaitGroup
45
+ ctx , cancelFn := context .WithCancel (context .Background ())
41
46
processor := & QueueingEventProcessor {
42
47
MaxQueueSize : 100 ,
43
48
FlushInterval : 100 ,
@@ -46,7 +51,7 @@ func TestDefaultEventProcessor_ProcessBatch(t *testing.T) {
46
51
wg : & wg ,
47
52
}
48
53
processor .BatchSize = 10
49
- processor .StartTicker (context . TODO () )
54
+ processor .StartTicker (ctx )
50
55
51
56
impression := BuildTestImpressionEvent ()
52
57
conversion := BuildTestConversionEvent ()
@@ -58,7 +63,7 @@ func TestDefaultEventProcessor_ProcessBatch(t *testing.T) {
58
63
59
64
assert .Equal (t , 4 , processor .EventsCount ())
60
65
61
- time . Sleep ( 200 * time . Millisecond )
66
+ close ( & wg , cancelFn )
62
67
63
68
assert .NotNil (t , processor .Ticker )
64
69
@@ -96,18 +101,15 @@ func TestBatchEventProcessor_FlushesOnClose(t *testing.T) {
96
101
97
102
assert .Equal (t , 4 , processor .EventsCount ())
98
103
99
- time .Sleep (500 * time .Millisecond )
100
-
101
104
// Triggers the flush in the processor
102
- cancelFn ()
103
-
104
- time .Sleep (500 * time .Millisecond )
105
+ close (& wg , cancelFn )
105
106
106
107
assert .Equal (t , 0 , processor .EventsCount ())
107
108
}
108
109
109
110
func TestDefaultEventProcessor_ProcessBatchRevisionMismatch (t * testing.T ) {
110
111
var wg sync.WaitGroup
112
+ ctx , cancelFn := context .WithCancel (context .Background ())
111
113
processor := & QueueingEventProcessor {
112
114
MaxQueueSize : 100 ,
113
115
FlushInterval : 100 ,
@@ -116,7 +118,7 @@ func TestDefaultEventProcessor_ProcessBatchRevisionMismatch(t *testing.T) {
116
118
wg : & wg ,
117
119
}
118
120
processor .BatchSize = 10
119
- processor .StartTicker (context . TODO () )
121
+ processor .StartTicker (ctx )
120
122
121
123
impression := BuildTestImpressionEvent ()
122
124
conversion := BuildTestConversionEvent ()
@@ -129,7 +131,7 @@ func TestDefaultEventProcessor_ProcessBatchRevisionMismatch(t *testing.T) {
129
131
130
132
assert .Equal (t , 4 , processor .EventsCount ())
131
133
132
- time . Sleep ( 200 * time . Millisecond )
134
+ close ( & wg , cancelFn )
133
135
134
136
assert .NotNil (t , processor .Ticker )
135
137
@@ -146,6 +148,7 @@ func TestDefaultEventProcessor_ProcessBatchRevisionMismatch(t *testing.T) {
146
148
147
149
func TestDefaultEventProcessor_ProcessBatchProjectMismatch (t * testing.T ) {
148
150
var wg sync.WaitGroup
151
+ ctx , cancelFn := context .WithCancel (context .Background ())
149
152
processor := & QueueingEventProcessor {
150
153
MaxQueueSize : 100 ,
151
154
FlushInterval : 100 ,
@@ -154,7 +157,7 @@ func TestDefaultEventProcessor_ProcessBatchProjectMismatch(t *testing.T) {
154
157
wg : & wg ,
155
158
}
156
159
processor .BatchSize = 10
157
- processor .StartTicker (context . TODO () )
160
+ processor .StartTicker (ctx )
158
161
159
162
impression := BuildTestImpressionEvent ()
160
163
conversion := BuildTestConversionEvent ()
@@ -167,7 +170,7 @@ func TestDefaultEventProcessor_ProcessBatchProjectMismatch(t *testing.T) {
167
170
168
171
assert .Equal (t , 4 , processor .EventsCount ())
169
172
170
- time . Sleep ( 200 * time . Millisecond )
173
+ close ( & wg , cancelFn )
171
174
172
175
assert .NotNil (t , processor .Ticker )
173
176
@@ -184,6 +187,7 @@ func TestDefaultEventProcessor_ProcessBatchProjectMismatch(t *testing.T) {
184
187
185
188
func TestChanQueueEventProcessor_ProcessImpression (t * testing.T ) {
186
189
var wg sync.WaitGroup
190
+ ctx , cancelFn := context .WithCancel (context .Background ())
187
191
processor := & QueueingEventProcessor {
188
192
MaxQueueSize : 100 ,
189
193
FlushInterval : 100 ,
@@ -192,15 +196,15 @@ func TestChanQueueEventProcessor_ProcessImpression(t *testing.T) {
192
196
wg : & wg ,
193
197
}
194
198
processor .BatchSize = 10
195
- processor .StartTicker (context . TODO () )
199
+ processor .StartTicker (ctx )
196
200
197
201
impression := BuildTestImpressionEvent ()
198
202
199
203
processor .ProcessEvent (impression )
200
204
processor .ProcessEvent (impression )
201
205
processor .ProcessEvent (impression )
202
206
203
- time . Sleep ( 3000 * time . Millisecond )
207
+ close ( & wg , cancelFn )
204
208
205
209
assert .NotNil (t , processor .Ticker )
206
210
@@ -209,9 +213,10 @@ func TestChanQueueEventProcessor_ProcessImpression(t *testing.T) {
209
213
210
214
func TestChanQueueEventProcessor_ProcessBatch (t * testing.T ) {
211
215
var wg sync.WaitGroup
216
+ ctx , cancelFn := context .WithCancel (context .Background ())
212
217
processor := & QueueingEventProcessor {MaxQueueSize : 100 , FlushInterval : 100 , Q : NewChanQueue (100 ), EventDispatcher : & MockDispatcher {}, wg : & wg }
213
218
processor .BatchSize = 10
214
- processor .StartTicker (context . TODO () )
219
+ processor .StartTicker (ctx )
215
220
216
221
impression := BuildTestImpressionEvent ()
217
222
conversion := BuildTestConversionEvent ()
@@ -221,14 +226,12 @@ func TestChanQueueEventProcessor_ProcessBatch(t *testing.T) {
221
226
processor .ProcessEvent (conversion )
222
227
processor .ProcessEvent (conversion )
223
228
224
- time . Sleep ( 3000 * time . Millisecond )
229
+ close ( & wg , cancelFn )
225
230
226
231
assert .NotNil (t , processor .Ticker )
227
232
228
233
assert .Equal (t , 0 , processor .EventsCount ())
229
234
230
- time .Sleep (3000 * time .Millisecond )
231
-
232
235
result , ok := (processor .EventDispatcher ).(* MockDispatcher )
233
236
234
237
if ok {
0 commit comments