Skip to content

Commit a487870

Browse files
passuiedyaron2
andauthored
Implement ability to exclude metadata items from being converted to/from headers (#3874)
Signed-off-by: Patrick Assuied <[email protected]> Co-authored-by: Yaron Schneider <[email protected]>
1 parent e72dbb6 commit a487870

File tree

8 files changed

+183
-13
lines changed

8 files changed

+183
-13
lines changed

bindings/kafka/metadata.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,3 +372,10 @@ metadata:
372372
- "range"
373373
- "sticky"
374374
- "roundrobin"
375+
- name: excludeHeaderMetaRegex
376+
type: string
377+
required: false
378+
description: |
379+
A regular expression to exclude keys from being converted to/from headers from/to metadata to avoid unwanted downstream side effects.
380+
example: '"^rawPayload|valueSchemaType$"'
381+
default: '""'

common/component/kafka/consumer.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession,
145145

146146
for i, message := range messages {
147147
if message != nil {
148-
metadata := GetEventMetadata(message, consumer.k.escapeHeaders)
148+
metadata := GetEventMetadata(message, consumer.k)
149149
handlerConfig, err := consumer.k.GetTopicHandlerConfig(message.Topic)
150150
if err != nil {
151151
return err
@@ -205,7 +205,7 @@ func (consumer *consumer) doCallback(session sarama.ConsumerGroupSession, messag
205205
Topic: message.Topic,
206206
Data: messageVal,
207207
}
208-
event.Metadata = GetEventMetadata(message, consumer.k.escapeHeaders)
208+
event.Metadata = GetEventMetadata(message, consumer.k)
209209

210210
err = handlerConfig.Handler(session.Context(), &event)
211211
if err == nil {
@@ -214,11 +214,11 @@ func (consumer *consumer) doCallback(session sarama.ConsumerGroupSession, messag
214214
return err
215215
}
216216

217-
func GetEventMetadata(message *sarama.ConsumerMessage, escapeHeaders bool) map[string]string {
217+
func GetEventMetadata(message *sarama.ConsumerMessage, kafka *Kafka) map[string]string {
218218
if message != nil {
219219
metadata := make(map[string]string, len(message.Headers)+5)
220220
if message.Key != nil {
221-
if escapeHeaders {
221+
if kafka.escapeHeaders {
222222
metadata[keyMetadataKey] = url.QueryEscape(string(message.Key))
223223
} else {
224224
metadata[keyMetadataKey] = string(message.Key)
@@ -229,7 +229,12 @@ func GetEventMetadata(message *sarama.ConsumerMessage, escapeHeaders bool) map[s
229229
metadata[timestampMetadataKey] = strconv.FormatInt(message.Timestamp.UnixMilli(), 10)
230230
metadata[partitionMetadataKey] = strconv.FormatInt(int64(message.Partition), 10)
231231
for _, header := range message.Headers {
232-
if escapeHeaders {
232+
// skip headers that are excluded from metadata
233+
if kafka.excludeHeaderMetaRegex != nil && kafka.excludeHeaderMetaRegex.MatchString(string(header.Key)) {
234+
kafka.logger.Debugf("Skipping header %v that is excluded from metadata", string(header.Key))
235+
continue
236+
}
237+
if kafka.escapeHeaders {
233238
metadata[string(header.Key)] = url.QueryEscape(string(header.Value))
234239
} else {
235240
metadata[string(header.Key)] = string(header.Value)

common/component/kafka/kafka.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"encoding/binary"
2020
"errors"
2121
"fmt"
22+
"regexp"
2223
"strings"
2324
"sync"
2425
"sync/atomic"
@@ -81,6 +82,8 @@ type Kafka struct {
8182
DefaultConsumeRetryEnabled bool
8283
consumeRetryEnabled bool
8384
consumeRetryInterval time.Duration
85+
86+
excludeHeaderMetaRegex *regexp.Regexp
8487
}
8588

8689
type SchemaType int
@@ -256,6 +259,10 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
256259
return errors.New("component is closed")
257260
}
258261

262+
if meta.ExcludeHeaderMetaRegex != "" {
263+
k.excludeHeaderMetaRegex = regexp.MustCompile(meta.ExcludeHeaderMetaRegex)
264+
}
265+
259266
k.logger.Debug("Kafka message bus initialization complete")
260267

261268
return nil

common/component/kafka/metadata.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ type KafkaMetadata struct {
116116
SchemaRegistryAPISecret string `mapstructure:"schemaRegistryAPISecret"`
117117
SchemaCachingEnabled bool `mapstructure:"schemaCachingEnabled"`
118118
SchemaLatestVersionCacheTTL time.Duration `mapstructure:"schemaLatestVersionCacheTTL"`
119+
120+
// header from/to metadata excluded keys regex
121+
ExcludeHeaderMetaRegex string `mapstructure:"excludeHeaderMetaRegex"`
119122
}
120123

121124
// upgradeMetadata updates metadata properties based on deprecated usage.
@@ -168,6 +171,7 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
168171
SchemaCachingEnabled: true,
169172
SchemaLatestVersionCacheTTL: 5 * time.Minute,
170173
EscapeHeaders: false,
174+
ExcludeHeaderMetaRegex: "",
171175
}
172176

173177
err := metadata.DecodeMetadata(meta, &m)
@@ -347,5 +351,9 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
347351
m.ClientConnectionKeepAliveInterval = defaultClientConnectionKeepAliveInterval
348352
}
349353

354+
if val, ok := meta["excludeHeaderMetaRegex"]; ok && val != "" {
355+
m.ExcludeHeaderMetaRegex = val
356+
}
357+
350358
return &m, nil
351359
}

common/component/kafka/metadata_test.go

Lines changed: 79 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package kafka
1616
import (
1717
"fmt"
1818
"net/url"
19+
"regexp"
1920
"strconv"
2021
"testing"
2122
"time"
@@ -513,7 +514,11 @@ func TestGetEventMetadata(t *testing.T) {
513514
m := sarama.ConsumerMessage{
514515
Headers: nil, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
515516
}
516-
act := GetEventMetadata(&m, false)
517+
k := Kafka{
518+
escapeHeaders: false,
519+
logger: logger.NewLogger("kafka_test"),
520+
}
521+
act := GetEventMetadata(&m, &k)
517522
require.Len(t, act, 5)
518523
require.Equal(t, strconv.FormatInt(ts.UnixMilli(), 10), act["__timestamp"])
519524
require.Equal(t, "MyKey", act["__key"])
@@ -530,7 +535,11 @@ func TestGetEventMetadata(t *testing.T) {
530535
m := sarama.ConsumerMessage{
531536
Headers: headers, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
532537
}
533-
act := GetEventMetadata(&m, false)
538+
k := Kafka{
539+
escapeHeaders: false,
540+
logger: logger.NewLogger("kafka_test"),
541+
}
542+
act := GetEventMetadata(&m, &k)
534543
require.Len(t, act, 7)
535544
require.Equal(t, strconv.FormatInt(ts.UnixMilli(), 10), act["__timestamp"])
536545
require.Equal(t, "MyKey", act["__key"])
@@ -545,7 +554,11 @@ func TestGetEventMetadata(t *testing.T) {
545554
m := sarama.ConsumerMessage{
546555
Headers: nil, Timestamp: ts, Key: nil, Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
547556
}
548-
act := GetEventMetadata(&m, false)
557+
k := Kafka{
558+
escapeHeaders: false,
559+
logger: logger.NewLogger("kafka_test"),
560+
}
561+
act := GetEventMetadata(&m, &k)
549562
require.Len(t, act, 4)
550563
require.Equal(t, strconv.FormatInt(ts.UnixMilli(), 10), act["__timestamp"])
551564
require.Equal(t, "0", act["__partition"])
@@ -554,7 +567,11 @@ func TestGetEventMetadata(t *testing.T) {
554567
})
555568

556569
t.Run("null message", func(t *testing.T) {
557-
act := GetEventMetadata(nil, false)
570+
k := Kafka{
571+
escapeHeaders: false,
572+
logger: logger.NewLogger("kafka_test"),
573+
}
574+
act := GetEventMetadata(nil, &k)
558575
require.Nil(t, act)
559576
})
560577

@@ -565,7 +582,11 @@ func TestGetEventMetadata(t *testing.T) {
565582
m := sarama.ConsumerMessage{
566583
Headers: nil, Timestamp: ts, Key: []byte(keyValue), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
567584
}
568-
act := GetEventMetadata(&m, true)
585+
k := Kafka{
586+
escapeHeaders: true,
587+
logger: logger.NewLogger("kafka_test"),
588+
}
589+
act := GetEventMetadata(&m, &k)
569590
require.Equal(t, escapedKeyValue, act[keyMetadataKey])
570591
})
571592

@@ -575,7 +596,11 @@ func TestGetEventMetadata(t *testing.T) {
575596
m := sarama.ConsumerMessage{
576597
Headers: nil, Timestamp: ts, Key: []byte(keyValue), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
577598
}
578-
act := GetEventMetadata(&m, false)
599+
k := Kafka{
600+
escapeHeaders: false,
601+
logger: logger.NewLogger("kafka_test"),
602+
}
603+
act := GetEventMetadata(&m, &k)
579604
require.Equal(t, keyValue, act[keyMetadataKey])
580605
})
581606

@@ -590,7 +615,11 @@ func TestGetEventMetadata(t *testing.T) {
590615
m := sarama.ConsumerMessage{
591616
Headers: headers, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
592617
}
593-
act := GetEventMetadata(&m, true)
618+
k := Kafka{
619+
escapeHeaders: true,
620+
logger: logger.NewLogger("kafka_test"),
621+
}
622+
act := GetEventMetadata(&m, &k)
594623
require.Len(t, act, 6)
595624
require.Equal(t, escapedHeaderValue, act[headerKey])
596625
})
@@ -605,8 +634,50 @@ func TestGetEventMetadata(t *testing.T) {
605634
m := sarama.ConsumerMessage{
606635
Headers: headers, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
607636
}
608-
act := GetEventMetadata(&m, false)
637+
k := Kafka{
638+
escapeHeaders: false,
639+
logger: logger.NewLogger("kafka_test"),
640+
}
641+
act := GetEventMetadata(&m, &k)
609642
require.Len(t, act, 6)
610643
require.Equal(t, headerValue, act[headerKey])
611644
})
645+
646+
t.Run("header with excluded key gets removed from metadata", func(t *testing.T) {
647+
headers := []*sarama.RecordHeader{
648+
{Key: []byte("valueSchemaType"), Value: []byte("Avro")},
649+
{Key: []byte("rawPayload"), Value: []byte("true")},
650+
}
651+
k := Kafka{
652+
escapeHeaders: false,
653+
excludeHeaderMetaRegex: regexp.MustCompile("^valueSchemaType$"),
654+
logger: logger.NewLogger("kafka_test"),
655+
}
656+
m := sarama.ConsumerMessage{
657+
Headers: headers, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
658+
}
659+
act := GetEventMetadata(&m, &k)
660+
require.Len(t, act, 6)
661+
require.NotContains(t, act, "valueSchemaType")
662+
require.Contains(t, act, "rawPayload")
663+
})
664+
665+
t.Run("header with excluded multiple keys gets removed from metadata", func(t *testing.T) {
666+
headers := []*sarama.RecordHeader{
667+
{Key: []byte("valueSchemaType"), Value: []byte("Avro")},
668+
{Key: []byte("rawPayload"), Value: []byte("true")},
669+
}
670+
k := Kafka{
671+
escapeHeaders: false,
672+
excludeHeaderMetaRegex: regexp.MustCompile("^valueSchemaType|rawPayload$"),
673+
logger: logger.NewLogger("kafka_test"),
674+
}
675+
m := sarama.ConsumerMessage{
676+
Headers: headers, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
677+
}
678+
act := GetEventMetadata(&m, &k)
679+
require.Len(t, act, 5)
680+
require.NotContains(t, act, "valueSchemaType")
681+
require.NotContains(t, act, "rawPayload")
682+
})
612683
}

common/component/kafka/producer.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata m
7878
if msg.Headers == nil {
7979
msg.Headers = make([]sarama.RecordHeader, 0, len(metadata))
8080
}
81+
// skip metadata that is excluded from headers
82+
if k.excludeHeaderMetaRegex != nil && k.excludeHeaderMetaRegex.MatchString(name) {
83+
k.logger.Debugf("Skipping metadata %v that is excluded from headers", name)
84+
continue
85+
}
8186
msg.Headers = append(msg.Headers, sarama.RecordHeader{
8287
Key: []byte(name),
8388
Value: []byte(value),
@@ -141,6 +146,11 @@ func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.Bu
141146
if msg.Headers == nil {
142147
msg.Headers = make([]sarama.RecordHeader, 0, len(metadata))
143148
}
149+
// skip metadata that is excluded from headers
150+
if k.excludeHeaderMetaRegex != nil && k.excludeHeaderMetaRegex.MatchString(name) {
151+
k.logger.Debugf("Skipping metadata %v that is excluded from headers", name)
152+
continue
153+
}
144154
msg.Headers = append(msg.Headers, sarama.RecordHeader{
145155
Key: []byte(name),
146156
Value: []byte(value),

common/component/kafka/producer_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package kafka
22

33
import (
4+
"regexp"
45
"testing"
56

67
"github.com/IBM/sarama"
@@ -94,6 +95,30 @@ func TestPublish(t *testing.T) {
9495
// assert
9596
require.NoError(t, err)
9697
})
98+
99+
t.Run("produce message with excluded headers", func(t *testing.T) {
100+
// arrange
101+
metadataIn := map[string]string{
102+
"a": "a",
103+
"b": "bVal",
104+
"c": "cVal",
105+
"__key": "key",
106+
}
107+
108+
metadataOut := map[string]string{
109+
"a": "a",
110+
"__key": "key",
111+
}
112+
messageAsserter := createMessageAsserter(t, sarama.StringEncoder("key"), metadataOut)
113+
k := arrangeKafkaWithAssertions(t, messageAsserter)
114+
k.excludeHeaderMetaRegex = regexp.MustCompile("^b|c$")
115+
116+
// act
117+
err := k.Publish(ctx, "a", []byte("a"), metadataIn)
118+
119+
// assert
120+
require.NoError(t, err)
121+
})
97122
}
98123

99124
func TestBulkPublish(t *testing.T) {
@@ -188,4 +213,34 @@ func TestBulkPublish(t *testing.T) {
188213
// assert
189214
require.NoError(t, err)
190215
})
216+
217+
t.Run("bulk produce messages with excluded headers", func(t *testing.T) {
218+
// arrange
219+
entries := []pubsub.BulkMessageEntry{
220+
{
221+
EntryId: "0",
222+
Event: []byte("a"),
223+
ContentType: "a",
224+
Metadata: map[string]string{"__key": "key", "a": "a", "b": "b", "c": "c"},
225+
},
226+
{
227+
EntryId: "0",
228+
Event: []byte("a"),
229+
ContentType: "a",
230+
Metadata: map[string]string{"c": "c"},
231+
},
232+
}
233+
messageAsserters := []saramamocks.MessageChecker{
234+
createMessageAsserter(t, sarama.StringEncoder("key"), map[string]string{"__key": "key", "common": "common", "a": "a"}),
235+
createMessageAsserter(t, nil, map[string]string{"common": "common"}),
236+
}
237+
k := arrangeKafkaWithAssertions(t, messageAsserters...)
238+
k.excludeHeaderMetaRegex = regexp.MustCompile("^b|c$")
239+
240+
// act
241+
_, err := k.BulkPublish(ctx, "a", entries, metadata)
242+
243+
// assert
244+
require.NoError(t, err)
245+
})
191246
}

pubsub/kafka/metadata.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,3 +363,10 @@ metadata:
363363
- "range"
364364
- "sticky"
365365
- "roundrobin"
366+
- name: excludeHeaderMetaRegex
367+
type: string
368+
required: false
369+
description: |
370+
A regular expression to exclude keys from being converted to/from headers from/to metadata to avoid unwanted downstream side effects.
371+
example: '"^rawPayload|valueSchemaType$"'
372+
default: '""'

0 commit comments

Comments
 (0)