@@ -143,6 +143,41 @@ func TestDefaultEventProcessor_LogEventNotification(t *testing.T) {
143
143
assert .Nil (t , err )
144
144
}
145
145
146
+ func TestDefaultEventProcessor_BatchSizes (t * testing.T ) {
147
+ eg := newExecutionContext ()
148
+ processor := NewBatchEventProcessor (
149
+ WithEventDispatcher (NewMockDispatcher (100 , false )),
150
+ // here we are setting the timing interval so that we don't have to wait the default 30 seconds
151
+ WithFlushInterval (500 * time .Minute ),
152
+ WithBatchSize (50 ))
153
+
154
+ eg .Go (processor .Start )
155
+
156
+ impression := BuildTestImpressionEvent ()
157
+
158
+ for i := 0 ; i < 100 ; i ++ {
159
+ processor .ProcessEvent (impression )
160
+ }
161
+
162
+ // sleep for 1 second here. to allow event processor to run.
163
+ time .Sleep (1 * time .Second )
164
+
165
+ assert .Equal (t , 0 , processor .eventsCount ())
166
+
167
+ result , ok := (processor .EventDispatcher ).(* MockDispatcher )
168
+
169
+ if ok {
170
+ assert .Equal (t , 2 , result .Events .Size ())
171
+ evs := result .Events .Get (3 )
172
+ logEvent , _ := evs [0 ].(LogEvent )
173
+ assert .Equal (t , 50 , len (logEvent .Event .Visitors ))
174
+ logEvent , _ = evs [1 ].(LogEvent )
175
+ assert .Equal (t , 50 , len (logEvent .Event .Visitors ))
176
+
177
+ }
178
+ eg .TerminateAndWait ()
179
+ }
180
+
146
181
func TestDefaultEventProcessor_DefaultConfig (t * testing.T ) {
147
182
eg := newExecutionContext ()
148
183
dispatcher := NewMockDispatcher (100 , false )
0 commit comments