@@ -2,6 +2,7 @@ package event
2
2
3
3
import (
4
4
"context"
5
+ "sync"
5
6
"testing"
6
7
"time"
7
8
@@ -10,8 +11,8 @@ import (
10
11
11
12
func TestDefaultEventProcessor_ProcessImpression (t * testing.T ) {
12
13
ctx := context .Background ()
13
-
14
- processor := NewEventProcessor (ctx , 10 , 100 , 100 )
14
+ var wg sync. WaitGroup
15
+ processor := NewEventProcessor (ctx , 10 , 100 , 100 , & wg )
15
16
16
17
impression := BuildTestImpressionEvent ()
17
18
@@ -36,11 +37,13 @@ func (f *MockDispatcher) DispatchEvent(event LogEvent, callback func(success boo
36
37
}
37
38
38
39
func TestDefaultEventProcessor_ProcessBatch (t * testing.T ) {
40
+ var wg sync.WaitGroup
39
41
processor := & QueueingEventProcessor {
40
42
MaxQueueSize : 100 ,
41
43
FlushInterval : 100 ,
42
44
Q : NewInMemoryQueue (100 ),
43
45
EventDispatcher : & MockDispatcher {},
46
+ wg : & wg ,
44
47
}
45
48
processor .BatchSize = 10
46
49
processor .StartTicker (context .TODO ())
@@ -72,11 +75,13 @@ func TestDefaultEventProcessor_ProcessBatch(t *testing.T) {
72
75
73
76
func TestBatchEventProcessor_FlushesOnClose (t * testing.T ) {
74
77
ctx , cancelFn := context .WithCancel (context .Background ())
78
+ var wg sync.WaitGroup
75
79
processor := & QueueingEventProcessor {
76
80
MaxQueueSize : 100 ,
77
81
FlushInterval : 30 * time .Second ,
78
82
Q : NewInMemoryQueue (100 ),
79
83
EventDispatcher : & MockDispatcher {},
84
+ wg : & wg ,
80
85
}
81
86
processor .BatchSize = 10
82
87
processor .StartTicker (ctx )
@@ -102,11 +107,13 @@ func TestBatchEventProcessor_FlushesOnClose(t *testing.T) {
102
107
}
103
108
104
109
func TestDefaultEventProcessor_ProcessBatchRevisionMismatch (t * testing.T ) {
110
+ var wg sync.WaitGroup
105
111
processor := & QueueingEventProcessor {
106
112
MaxQueueSize : 100 ,
107
113
FlushInterval : 100 ,
108
114
Q : NewInMemoryQueue (100 ),
109
115
EventDispatcher : & MockDispatcher {},
116
+ wg : & wg ,
110
117
}
111
118
processor .BatchSize = 10
112
119
processor .StartTicker (context .TODO ())
@@ -138,11 +145,13 @@ func TestDefaultEventProcessor_ProcessBatchRevisionMismatch(t *testing.T) {
138
145
}
139
146
140
147
func TestDefaultEventProcessor_ProcessBatchProjectMismatch (t * testing.T ) {
148
+ var wg sync.WaitGroup
141
149
processor := & QueueingEventProcessor {
142
150
MaxQueueSize : 100 ,
143
151
FlushInterval : 100 ,
144
152
Q : NewInMemoryQueue (100 ),
145
153
EventDispatcher : & MockDispatcher {},
154
+ wg : & wg ,
146
155
}
147
156
processor .BatchSize = 10
148
157
processor .StartTicker (context .TODO ())
@@ -174,11 +183,13 @@ func TestDefaultEventProcessor_ProcessBatchProjectMismatch(t *testing.T) {
174
183
}
175
184
176
185
func TestChanQueueEventProcessor_ProcessImpression (t * testing.T ) {
177
- processor := & QueueingEventProcessor {
186
+ var wg sync.WaitGroup
187
+ processor := & QueueingEventProcessor {
178
188
MaxQueueSize : 100 ,
179
189
FlushInterval : 100 ,
180
190
Q : NewChanQueue (100 ),
181
191
EventDispatcher : & HTTPEventDispatcher {},
192
+ wg : & wg ,
182
193
}
183
194
processor .BatchSize = 10
184
195
processor .StartTicker (context .TODO ())
@@ -197,7 +208,8 @@ func TestChanQueueEventProcessor_ProcessImpression(t *testing.T) {
197
208
}
198
209
199
210
func TestChanQueueEventProcessor_ProcessBatch (t * testing.T ) {
200
- processor := & QueueingEventProcessor {MaxQueueSize : 100 , FlushInterval : 100 , Q : NewChanQueue (100 ), EventDispatcher : & MockDispatcher {}}
211
+ var wg sync.WaitGroup
212
+ processor := & QueueingEventProcessor {MaxQueueSize : 100 , FlushInterval : 100 , Q : NewChanQueue (100 ), EventDispatcher : & MockDispatcher {}, wg : & wg }
201
213
processor .BatchSize = 10
202
214
processor .StartTicker (context .TODO ())
203
215
0 commit comments