Skip to content

Commit 007d0e0

Browse files
authored
Kafka: Add message key and other metadata as metadata in consumer (dapr#3289)
Signed-off-by: Patrick Assuied <[email protected]>
1 parent 0c48ced commit 007d0e0

File tree

3 files changed

+80
-13
lines changed

3 files changed

+80
-13
lines changed

common/component/kafka/consumer.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,7 @@ func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession,
141141

142142
for i, message := range messages {
143143
if message != nil {
144-
metadata := make(map[string]string, len(message.Headers))
145-
if message.Headers != nil {
146-
for _, t := range message.Headers {
147-
metadata[string(t.Key)] = string(t.Value)
148-
}
149-
}
144+
metadata := GetEventMetadata(message)
150145
childMessage := KafkaBulkMessageEntry{
151146
EntryId: strconv.Itoa(i),
152147
Event: message.Value,
@@ -193,20 +188,33 @@ func (consumer *consumer) doCallback(session sarama.ConsumerGroupSession, messag
193188
Topic: message.Topic,
194189
Data: message.Value,
195190
}
196-
// This is true only when headers are set (Kafka > 0.11)
197-
if len(message.Headers) > 0 {
198-
event.Metadata = make(map[string]string, len(message.Headers))
199-
for _, header := range message.Headers {
200-
event.Metadata[string(header.Key)] = string(header.Value)
201-
}
202-
}
191+
event.Metadata = GetEventMetadata(message)
192+
203193
err = handlerConfig.Handler(session.Context(), &event)
204194
if err == nil {
205195
session.MarkMessage(message, "")
206196
}
207197
return err
208198
}
209199

200+
func GetEventMetadata(message *sarama.ConsumerMessage) map[string]string {
201+
if message != nil {
202+
metadata := make(map[string]string, len(message.Headers)+5)
203+
if message.Key != nil {
204+
metadata[keyMetadataKey] = string(message.Key)
205+
}
206+
metadata[offsetMetadataKey] = strconv.FormatInt(message.Offset, 10)
207+
metadata[topicMetadataKey] = message.Topic
208+
metadata[timestampMetadataKey] = strconv.FormatInt(message.Timestamp.UnixMilli(), 10)
209+
metadata[partitionMetadataKey] = strconv.FormatInt(int64(message.Partition), 10)
210+
for _, header := range message.Headers {
211+
metadata[string(header.Key)] = string(header.Value)
212+
}
213+
return metadata
214+
}
215+
return nil
216+
}
217+
210218
func (consumer *consumer) Cleanup(sarama.ConsumerGroupSession) error {
211219
return nil
212220
}

common/component/kafka/metadata.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ import (
2828

2929
const (
3030
key = "partitionKey"
31+
keyMetadataKey = "__key"
32+
timestampMetadataKey = "__timestamp"
33+
offsetMetadataKey = "__offset"
34+
partitionMetadataKey = "__partition"
35+
topicMetadataKey = "__topic"
3136
skipVerify = "skipVerify"
3237
caCert = "caCert"
3338
certificateAuthType = "certificate"

common/component/kafka/metadata_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package kafka
1515

1616
import (
1717
"fmt"
18+
"strconv"
1819
"testing"
1920
"time"
2021

@@ -389,3 +390,56 @@ func TestMetadataChannelBufferSize(t *testing.T) {
389390
require.NoError(t, err)
390391
require.Equal(t, 128, meta.channelBufferSize)
391392
}
393+
394+
func TestGetEventMetadata(t *testing.T) {
395+
ts := time.Now()
396+
397+
t.Run("no headers", func(t *testing.T) {
398+
m := sarama.ConsumerMessage{
399+
Headers: nil, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
400+
}
401+
act := GetEventMetadata(&m)
402+
require.Len(t, act, 5)
403+
require.Equal(t, strconv.FormatInt(ts.UnixMilli(), 10), act["__timestamp"])
404+
require.Equal(t, "MyKey", act["__key"])
405+
require.Equal(t, "0", act["__partition"])
406+
require.Equal(t, "123", act["__offset"])
407+
require.Equal(t, "TestTopic", act["__topic"])
408+
})
409+
410+
t.Run("with headers", func(t *testing.T) {
411+
headers := []*sarama.RecordHeader{
412+
{Key: []byte("key1"), Value: []byte("value1")},
413+
{Key: []byte("key2"), Value: []byte("value2")},
414+
}
415+
m := sarama.ConsumerMessage{
416+
Headers: headers, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
417+
}
418+
act := GetEventMetadata(&m)
419+
require.Len(t, act, 7)
420+
require.Equal(t, strconv.FormatInt(ts.UnixMilli(), 10), act["__timestamp"])
421+
require.Equal(t, "MyKey", act["__key"])
422+
require.Equal(t, "0", act["__partition"])
423+
require.Equal(t, "123", act["__offset"])
424+
require.Equal(t, "TestTopic", act["__topic"])
425+
require.Equal(t, "value1", act["key1"])
426+
require.Equal(t, "value2", act["key2"])
427+
})
428+
429+
t.Run("no key", func(t *testing.T) {
430+
m := sarama.ConsumerMessage{
431+
Headers: nil, Timestamp: ts, Key: nil, Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
432+
}
433+
act := GetEventMetadata(&m)
434+
require.Len(t, act, 4)
435+
require.Equal(t, strconv.FormatInt(ts.UnixMilli(), 10), act["__timestamp"])
436+
require.Equal(t, "0", act["__partition"])
437+
require.Equal(t, "123", act["__offset"])
438+
require.Equal(t, "TestTopic", act["__topic"])
439+
})
440+
441+
t.Run("null message", func(t *testing.T) {
442+
act := GetEventMetadata(nil)
443+
require.Nil(t, act)
444+
})
445+
}

0 commit comments

Comments
 (0)