Skip to content

Commit 16d9c36

Browse files
Merge pull request #237 from OlivierCazade/NETOBSERV-390
Netobserv 390: fix kafka transformer
2 parents 0e670ba + 30d60a2 commit 16d9c36

File tree

3 files changed

+111
-22
lines changed

3 files changed

+111
-22
lines changed

pkg/api/ingest_kafka.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,6 @@ type IngestKafka struct {
2525
StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
2626
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
2727
Decoder Decoder `yaml:"decoder,omitempty" json:"decoder" doc:"decoder to use (E.g. json or protobuf)"`
28+
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
29+
CommitInterval int64 `yaml:"commitInterval,omitempty" json:"commitInterval,omitempty" doc:"the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously."`
2830
}

pkg/pipeline/ingest/ingest_kafka.go

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,19 @@ type kafkaReadMessage interface {
3636
}
3737

3838
type ingestKafka struct {
39-
kafkaParams api.IngestKafka
40-
kafkaReader kafkaReadMessage
41-
decoder decode.Decoder
42-
in chan string
43-
exitChan <-chan struct{}
44-
prevRecords []config.GenericMap // copy of most recently sent records; for testing and debugging
39+
kafkaParams api.IngestKafka
40+
kafkaReader kafkaReadMessage
41+
decoder decode.Decoder
42+
in chan string
43+
exitChan <-chan struct{}
44+
prevRecords []config.GenericMap // copy of most recently sent records; for testing and debugging
45+
batchMaxLength int
4546
}
4647

4748
const channelSizeKafka = 1000
48-
const defaultBatchReadTimeout = int64(100)
49+
const defaultBatchReadTimeout = int64(1000)
50+
const defaultKafkaBatchMaxLength = 500
51+
const defaultKafkaCommitInterval = 500
4952

5053
// Ingest ingests entries from kafka topic
5154
func (ingestK *ingestKafka) Ingest(out chan<- []config.GenericMap) {
@@ -83,21 +86,36 @@ func (ingestK *ingestKafka) kafkaListener() {
8386
func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) {
8487
var records []interface{}
8588
duration := time.Duration(ingestK.kafkaParams.BatchReadTimeout) * time.Millisecond
89+
flushRecords := time.NewTicker(duration)
8690
for {
8791
select {
8892
case <-ingestK.exitChan:
8993
log.Debugf("exiting ingestKafka because of signal")
9094
return
9195
case record := <-ingestK.in:
9296
records = append(records, record)
93-
case <-time.After(duration): // Maximum batch time for each batch
97+
if len(records) >= ingestK.batchMaxLength {
98+
log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in))
99+
decoded := ingestK.decoder.Decode(records)
100+
out <- decoded
101+
ingestK.prevRecords = decoded
102+
log.Debugf("prevRecords = %v", ingestK.prevRecords)
103+
records = []interface{}{}
104+
}
105+
case <-flushRecords.C: // Maximum batch time for each batch
94106
// Process batch of records (if not empty)
95107
if len(records) > 0 {
96-
log.Debugf("ingestKafka sending %d records", len(records))
108+
if len(ingestK.in) > 0 {
109+
for len(records) < ingestK.batchMaxLength && len(ingestK.in) > 0 {
110+
record := <-ingestK.in
111+
records = append(records, record)
112+
}
113+
}
114+
log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in))
97115
decoded := ingestK.decoder.Decode(records)
98-
out <- decoded
99116
ingestK.prevRecords = decoded
100117
log.Debugf("prevRecords = %v", ingestK.prevRecords)
118+
out <- decoded
101119
}
102120
records = []interface{}{}
103121
}
@@ -145,12 +163,18 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
145163
}
146164
log.Infof("BatchReadTimeout = %d", jsonIngestKafka.BatchReadTimeout)
147165

166+
commitInterval := int64(defaultKafkaCommitInterval)
167+
if jsonIngestKafka.CommitInterval != 0 {
168+
commitInterval = jsonIngestKafka.CommitInterval
169+
}
170+
148171
kafkaReader := kafkago.NewReader(kafkago.ReaderConfig{
149172
Brokers: jsonIngestKafka.Brokers,
150173
Topic: jsonIngestKafka.Topic,
151174
GroupID: jsonIngestKafka.GroupId,
152175
GroupBalancers: groupBalancers,
153176
StartOffset: startOffset,
177+
CommitInterval: time.Duration(commitInterval) * time.Millisecond,
154178
})
155179
if kafkaReader == nil {
156180
errMsg := "NewIngestKafka: failed to create kafka-go reader"
@@ -164,12 +188,18 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
164188
return nil, err
165189
}
166190

191+
bml := defaultKafkaBatchMaxLength
192+
if jsonIngestKafka.BatchMaxLen != 0 {
193+
bml = jsonIngestKafka.BatchMaxLen
194+
}
195+
167196
return &ingestKafka{
168-
kafkaParams: jsonIngestKafka,
169-
kafkaReader: kafkaReader,
170-
decoder: decoder,
171-
exitChan: utils.ExitChannel(),
172-
in: make(chan string, channelSizeKafka),
173-
prevRecords: make([]config.GenericMap, 0),
197+
kafkaParams: jsonIngestKafka,
198+
kafkaReader: kafkaReader,
199+
decoder: decoder,
200+
exitChan: utils.ExitChannel(),
201+
in: make(chan string, channelSizeKafka),
202+
prevRecords: make([]config.GenericMap, 0),
203+
batchMaxLength: bml,
174204
}, nil
175205
}

pkg/pipeline/ingest/ingest_kafka_test.go

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ parameters:
6464
groupBalancers: ["rackAffinity"]
6565
decoder:
6666
type: json
67+
batchMaxLen: 1000
68+
commitInterval: 1000
6769
`
6870

6971
func initNewIngestKafka(t *testing.T, configTemplate string) Ingester {
@@ -85,6 +87,8 @@ func Test_NewIngestKafka1(t *testing.T) {
8587
require.Equal(t, "FirstOffset", ingestKafka.kafkaParams.StartOffset)
8688
require.Equal(t, 2, len(ingestKafka.kafkaReader.Config().GroupBalancers))
8789
require.Equal(t, int64(300), ingestKafka.kafkaParams.BatchReadTimeout)
90+
require.Equal(t, int(500), ingestKafka.batchMaxLength)
91+
require.Equal(t, time.Duration(500)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval)
8892
}
8993

9094
func Test_NewIngestKafka2(t *testing.T) {
@@ -97,6 +101,8 @@ func Test_NewIngestKafka2(t *testing.T) {
97101
require.Equal(t, "LastOffset", ingestKafka.kafkaParams.StartOffset)
98102
require.Equal(t, 1, len(ingestKafka.kafkaReader.Config().GroupBalancers))
99103
require.Equal(t, defaultBatchReadTimeout, ingestKafka.kafkaParams.BatchReadTimeout)
104+
require.Equal(t, int(1000), ingestKafka.batchMaxLength)
105+
require.Equal(t, time.Duration(1000)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval)
100106
}
101107

102108
func removeTimestamp(receivedEntries []config.GenericMap) {
@@ -141,17 +147,16 @@ func Test_IngestKafka(t *testing.T) {
141147
}
142148

143149
type fakeKafkaReader struct {
150+
readToDo int
144151
mock.Mock
145152
}
146153

147154
var fakeRecord = []byte(`{"Bytes":20801,"DstAddr":"10.130.2.1","DstPort":36936,"Packets":401,"SrcAddr":"10.130.2.13","SrcPort":3100}`)
148155

149-
var performedRead = false
150-
151156
// ReadMessage runs in the kafka client thread, which blocks until data is available.
152-
// If data is always available, we have an infinite loop. So we return data only once.
157+
// If data is always available, we have an infinite loop. So we return data only a specified number of time.
153158
func (f *fakeKafkaReader) ReadMessage(ctx context.Context) (kafkago.Message, error) {
154-
if performedRead {
159+
if f.readToDo == 0 {
155160
// block indefinitely
156161
c := make(chan struct{})
157162
<-c
@@ -160,7 +165,7 @@ func (f *fakeKafkaReader) ReadMessage(ctx context.Context) (kafkago.Message, err
160165
Topic: "topic1",
161166
Value: fakeRecord,
162167
}
163-
performedRead = true
168+
f.readToDo -= 1
164169
return message, nil
165170
}
166171

@@ -174,7 +179,7 @@ func Test_KafkaListener(t *testing.T) {
174179
ingestKafka := newIngest.(*ingestKafka)
175180

176181
// change the ReadMessage function to the mock-up
177-
fr := fakeKafkaReader{}
182+
fr := fakeKafkaReader{readToDo: 1}
178183
ingestKafka.kafkaReader = &fr
179184

180185
// run Ingest in a separate thread
@@ -192,3 +197,55 @@ func Test_KafkaListener(t *testing.T) {
192197
require.Equal(t, 1, len(receivedEntries))
193198
require.Equal(t, test.DeserializeJSONToMap(t, string(fakeRecord)), receivedEntries[0])
194199
}
200+
201+
func Test_MaxBatchLength(t *testing.T) {
202+
ingestOutput := make(chan []config.GenericMap)
203+
newIngest := initNewIngestKafka(t, testConfig1)
204+
ingestKafka := newIngest.(*ingestKafka)
205+
206+
// change the ReadMessage function to the mock-up
207+
fr := fakeKafkaReader{readToDo: 15}
208+
ingestKafka.kafkaReader = &fr
209+
ingestKafka.batchMaxLength = 10
210+
ingestKafka.kafkaParams.BatchReadTimeout = 10000
211+
212+
// run Ingest in a separate thread
213+
go func() {
214+
ingestKafka.Ingest(ingestOutput)
215+
}()
216+
217+
// wait for the data to have been processed
218+
receivedEntries := <-ingestOutput
219+
220+
require.Equal(t, 10, len(receivedEntries))
221+
}
222+
223+
func Test_BatchTimeout(t *testing.T) {
224+
ingestOutput := make(chan []config.GenericMap)
225+
newIngest := initNewIngestKafka(t, testConfig1)
226+
ingestKafka := newIngest.(*ingestKafka)
227+
228+
// change the ReadMessage function to the mock-up
229+
fr := fakeKafkaReader{readToDo: 5}
230+
ingestKafka.kafkaReader = &fr
231+
ingestKafka.batchMaxLength = 1000
232+
ingestKafka.kafkaParams.BatchReadTimeout = 100
233+
234+
beforeIngest := time.Now()
235+
// run Ingest in a separate thread
236+
go func() {
237+
ingestKafka.Ingest(ingestOutput)
238+
}()
239+
240+
require.Equal(t, 0, len(ingestOutput))
241+
// wait for the data to have been processed
242+
receivedEntries := <-ingestOutput
243+
require.Equal(t, 5, len(receivedEntries))
244+
245+
afterIngest := time.Now()
246+
247+
// We check that we get entries because of the timer
248+
// Time must be above timer value but not too much, 20ms is our margin here
249+
require.LessOrEqual(t, int64(100), afterIngest.Sub(beforeIngest).Milliseconds())
250+
require.Greater(t, int64(120), afterIngest.Sub(beforeIngest).Milliseconds())
251+
}

0 commit comments

Comments
 (0)