Skip to content

Commit 30d60a2

Browse files
committed
Added tests for the batch mecanisms of the kafka ingester
1 parent d90a2e4 commit 30d60a2

File tree

1 file changed

+57
-6
lines changed

1 file changed

+57
-6
lines changed

pkg/pipeline/ingest/ingest_kafka_test.go

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,17 +147,16 @@ func Test_IngestKafka(t *testing.T) {
147147
}
148148

149149
type fakeKafkaReader struct {
150+
readToDo int
150151
mock.Mock
151152
}
152153

153154
var fakeRecord = []byte(`{"Bytes":20801,"DstAddr":"10.130.2.1","DstPort":36936,"Packets":401,"SrcAddr":"10.130.2.13","SrcPort":3100}`)
154155

155-
var performedRead = false
156-
157156
// ReadMessage runs in the kafka client thread, which blocks until data is available.
158-
// If data is always available, we have an infinite loop. So we return data only once.
157+
// If data is always available, we have an infinite loop. So we return data only a specified number of time.
159158
func (f *fakeKafkaReader) ReadMessage(ctx context.Context) (kafkago.Message, error) {
160-
if performedRead {
159+
if f.readToDo == 0 {
161160
// block indefinitely
162161
c := make(chan struct{})
163162
<-c
@@ -166,7 +165,7 @@ func (f *fakeKafkaReader) ReadMessage(ctx context.Context) (kafkago.Message, err
166165
Topic: "topic1",
167166
Value: fakeRecord,
168167
}
169-
performedRead = true
168+
f.readToDo -= 1
170169
return message, nil
171170
}
172171

@@ -180,7 +179,7 @@ func Test_KafkaListener(t *testing.T) {
180179
ingestKafka := newIngest.(*ingestKafka)
181180

182181
// change the ReadMessage function to the mock-up
183-
fr := fakeKafkaReader{}
182+
fr := fakeKafkaReader{readToDo: 1}
184183
ingestKafka.kafkaReader = &fr
185184

186185
// run Ingest in a separate thread
@@ -198,3 +197,55 @@ func Test_KafkaListener(t *testing.T) {
198197
require.Equal(t, 1, len(receivedEntries))
199198
require.Equal(t, test.DeserializeJSONToMap(t, string(fakeRecord)), receivedEntries[0])
200199
}
200+
201+
func Test_MaxBatchLength(t *testing.T) {
202+
ingestOutput := make(chan []config.GenericMap)
203+
newIngest := initNewIngestKafka(t, testConfig1)
204+
ingestKafka := newIngest.(*ingestKafka)
205+
206+
// change the ReadMessage function to the mock-up
207+
fr := fakeKafkaReader{readToDo: 15}
208+
ingestKafka.kafkaReader = &fr
209+
ingestKafka.batchMaxLength = 10
210+
ingestKafka.kafkaParams.BatchReadTimeout = 10000
211+
212+
// run Ingest in a separate thread
213+
go func() {
214+
ingestKafka.Ingest(ingestOutput)
215+
}()
216+
217+
// wait for the data to have been processed
218+
receivedEntries := <-ingestOutput
219+
220+
require.Equal(t, 10, len(receivedEntries))
221+
}
222+
223+
func Test_BatchTimeout(t *testing.T) {
224+
ingestOutput := make(chan []config.GenericMap)
225+
newIngest := initNewIngestKafka(t, testConfig1)
226+
ingestKafka := newIngest.(*ingestKafka)
227+
228+
// change the ReadMessage function to the mock-up
229+
fr := fakeKafkaReader{readToDo: 5}
230+
ingestKafka.kafkaReader = &fr
231+
ingestKafka.batchMaxLength = 1000
232+
ingestKafka.kafkaParams.BatchReadTimeout = 100
233+
234+
beforeIngest := time.Now()
235+
// run Ingest in a separate thread
236+
go func() {
237+
ingestKafka.Ingest(ingestOutput)
238+
}()
239+
240+
require.Equal(t, 0, len(ingestOutput))
241+
// wait for the data to have been processed
242+
receivedEntries := <-ingestOutput
243+
require.Equal(t, 5, len(receivedEntries))
244+
245+
afterIngest := time.Now()
246+
247+
// We check that we get entries because of the timer
248+
// Time must be above timer value but not too much, 20ms is our margin here
249+
require.LessOrEqual(t, int64(100), afterIngest.Sub(beforeIngest).Milliseconds())
250+
require.Greater(t, int64(120), afterIngest.Sub(beforeIngest).Milliseconds())
251+
}

0 commit comments

Comments
 (0)