Skip to content

Commit 972d108

Browse files
kevinconawayedenhill
authored andcommitted
Add support for go.delivery.report.fields
This configuration property will allow users to enable/disable certain fields of the delivery reports to reduce allocations. Only the key & value are currently supported
1 parent b6d25be commit 972d108

File tree

4 files changed

+170
-2
lines changed

4 files changed

+170
-2
lines changed

kafka/handle.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package kafka
1818

1919
import (
2020
"fmt"
21+
"strings"
2122
"sync"
2223
"time"
2324
"unsafe"
@@ -113,6 +114,9 @@ type handle struct {
113114
// Forward delivery reports on Producer.Events channel
114115
fwdDr bool
115116

117+
// Enabled fields for delivery reports
118+
msgFields *messageFields
119+
116120
//
117121
// consumer
118122
//
@@ -134,6 +138,9 @@ func (h *handle) setup() {
134138
h.rktNameCache = make(map[*C.rd_kafka_topic_t]string)
135139
h.cgomap = make(map[int]cgoif)
136140
h.name = C.GoString(C.rd_kafka_name(h.rk))
141+
if h.msgFields == nil {
142+
h.msgFields = newMessageFields()
143+
}
137144
}
138145

139146
func (h *handle) cleanup() {
@@ -323,3 +330,48 @@ func (h *handle) setOAuthBearerTokenFailure(errstr string) error {
323330
}
324331
return newError(cErr)
325332
}
333+
334+
// messageFields controls which fields are made available for producer delivery reports & incoming messages
335+
// true values indicate that the field should be included
336+
type messageFields struct {
337+
Key bool
338+
Value bool
339+
}
340+
341+
// disableAll disable all fields
342+
func (mf *messageFields) disableAll() {
343+
mf.Key = false
344+
mf.Value = false
345+
}
346+
347+
// newMessageFields returns a new messageFields with all fields enabled
348+
func newMessageFields() *messageFields {
349+
return &messageFields{
350+
Key: true,
351+
Value: true,
352+
}
353+
}
354+
355+
// newMessageFieldsFrom constructs a new messageFields from the given configuration value
356+
func newMessageFieldsFrom(v ConfigValue) (*messageFields, error) {
357+
msgFields := newMessageFields()
358+
switch v {
359+
case "all":
360+
// nothing to do
361+
case "", "none":
362+
msgFields.disableAll()
363+
default:
364+
msgFields.disableAll()
365+
for _, value := range strings.Split(v.(string), ",") {
366+
switch value {
367+
case "key":
368+
msgFields.Key = true
369+
case "value":
370+
msgFields.Value = true
371+
default:
372+
return nil, fmt.Errorf("unknown message field: %s", value)
373+
}
374+
}
375+
}
376+
return msgFields, nil
377+
}

kafka/message.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,10 @@ func (h *handle) setupMessageFromC(msg *Message, cmsg *C.rd_kafka_message_t) {
135135
msg.TopicPartition.Topic = &topic
136136
}
137137
msg.TopicPartition.Partition = int32(cmsg.partition)
138-
if cmsg.payload != nil {
138+
if cmsg.payload != nil && h.msgFields.Value {
139139
msg.Value = C.GoBytes(unsafe.Pointer(cmsg.payload), C.int(cmsg.len))
140140
}
141-
if cmsg.key != nil {
141+
if cmsg.key != nil && h.msgFields.Key {
142142
msg.Key = C.GoBytes(unsafe.Pointer(cmsg.key), C.int(cmsg.key_len))
143143
}
144144
msg.TopicPartition.Offset = Offset(cmsg.offset)

kafka/producer.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,8 @@ func (p *Producer) Purge(flags int) error {
423423
// Note: timestamps and headers are not supported with this interface.
424424
// go.delivery.reports (bool, true) - Forward per-message delivery reports to the
425425
// Events() channel.
426+
// go.delivery.report.fields (string, all) - Comma separated list of fields to enable for delivery reports.
427+
// Allowed values: all, none (or empty string), key, value
426428
// go.events.channel.size (int, 1000000) - Events().
427429
// go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)
428430
// go.logs.channel.enable (bool, false) - Forward log to Logs() channel.
@@ -461,6 +463,17 @@ func NewProducer(conf *ConfigMap) (*Producer, error) {
461463
}
462464
p.handle.fwdDr = v.(bool)
463465

466+
v, err = confCopy.extract("go.delivery.report.fields", "all")
467+
if err != nil {
468+
return nil, err
469+
}
470+
471+
msgFields, err := newMessageFieldsFrom(v)
472+
if err != nil {
473+
return nil, err
474+
}
475+
p.handle.msgFields = msgFields
476+
464477
v, err = confCopy.extract("go.events.channel.size", 1000000)
465478
if err != nil {
466479
return nil, err

kafka/producer_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package kafka
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"encoding/binary"
2223
"encoding/json"
@@ -563,3 +564,105 @@ func TestTransactionalAPI(t *testing.T) {
563564

564565
p.Close()
565566
}
567+
568+
// TestProducerDeliveryReportFields tests the `go.delivery.report.fields` config setting
569+
func TestProducerDeliveryReportFields(t *testing.T) {
570+
t.Run("none", func(t *testing.T) {
571+
runProducerDeliveryReportFieldTest(t, &ConfigMap{
572+
"socket.timeout.ms": 10,
573+
"message.timeout.ms": 10,
574+
"go.delivery.report.fields": "",
575+
}, func(expected, actual *Message) {
576+
if len(actual.Key) > 0 {
577+
t.Errorf("key should not be set")
578+
}
579+
if len(actual.Value) > 0 {
580+
t.Errorf("value should not be set")
581+
}
582+
if s, ok := actual.Opaque.(*string); ok {
583+
if *s != *(expected.Opaque.(*string)) {
584+
t.Errorf("Opaque should be \"%v\", not \"%v\"", expected.Opaque, actual.Opaque)
585+
}
586+
} else {
587+
t.Errorf("opaque value should be a string, not \"%v\"", actual.Opaque)
588+
}
589+
})
590+
})
591+
t.Run("single", func(t *testing.T) {
592+
runProducerDeliveryReportFieldTest(t, &ConfigMap{
593+
"socket.timeout.ms": 10,
594+
"message.timeout.ms": 10,
595+
"go.delivery.report.fields": "key",
596+
}, func(expected, actual *Message) {
597+
if !bytes.Equal(expected.Key, actual.Key) {
598+
t.Errorf("key should be \"%s\", not \"%s\"", expected.Key, actual.Key)
599+
}
600+
if len(actual.Value) > 0 {
601+
t.Errorf("value should not be set")
602+
}
603+
if s, ok := actual.Opaque.(*string); ok {
604+
if *s != *(expected.Opaque.(*string)) {
605+
t.Errorf("Opaque should be \"%v\", not \"%v\"", expected.Opaque, actual.Opaque)
606+
}
607+
} else {
608+
t.Errorf("opaque value should be a string, not \"%v\"", actual.Opaque)
609+
}
610+
})
611+
})
612+
t.Run("multiple", func(t *testing.T) {
613+
runProducerDeliveryReportFieldTest(t, &ConfigMap{
614+
"socket.timeout.ms": 10,
615+
"message.timeout.ms": 10,
616+
"go.delivery.report.fields": "key,value",
617+
}, func(expected, actual *Message) {
618+
if !bytes.Equal(expected.Key, actual.Key) {
619+
t.Errorf("key should be \"%s\", not \"%s\"", expected.Key, actual.Key)
620+
}
621+
if !bytes.Equal(expected.Value, actual.Value) {
622+
t.Errorf("value should be \"%s\", not \"%s\"", expected.Value, actual.Value)
623+
}
624+
if s, ok := actual.Opaque.(*string); ok {
625+
if *s != *(expected.Opaque.(*string)) {
626+
t.Errorf("Opaque should be \"%v\", not \"%v\"", expected.Opaque, actual.Opaque)
627+
}
628+
} else {
629+
t.Errorf("opaque value should be a string, not \"%v\"", actual.Opaque)
630+
}
631+
})
632+
})
633+
}
634+
635+
func runProducerDeliveryReportFieldTest(t *testing.T, config *ConfigMap, fn func(expected, actual *Message)) {
636+
p, err := NewProducer(config)
637+
if err != nil {
638+
t.Fatalf("%s", err)
639+
}
640+
641+
topic1 := "gotest"
642+
643+
myOpq := "My opaque"
644+
expected := &Message{
645+
TopicPartition: TopicPartition{Topic: &topic1, Partition: 0},
646+
Opaque: &myOpq,
647+
Value: []byte("ProducerChannel"),
648+
Key: []byte("This is my key"),
649+
}
650+
p.ProduceChannel() <- expected
651+
652+
// We should expect a single message and possibly events
653+
for msgCnt := 0; msgCnt < 1; {
654+
ev := <-p.Events()
655+
switch e := ev.(type) {
656+
case *Message:
657+
msgCnt++
658+
fn(expected, e)
659+
default:
660+
t.Logf("Ignored event %s", e)
661+
}
662+
}
663+
664+
r := p.Flush(2000)
665+
if r > 0 {
666+
t.Errorf("Expected empty queue after Flush, still has %d", r)
667+
}
668+
}

0 commit comments

Comments
 (0)