Skip to content

Commit 294dd75

Browse files
GallardotJoshVanLyaron2
authored
feat: kafka subpub and bindings support compression (#3676)
Signed-off-by: Gallardot <[email protected]> Co-authored-by: Josh van Leeuwen <[email protected]> Co-authored-by: Yaron Schneider <[email protected]>
1 parent 35b77e0 commit 294dd75

File tree

6 files changed

+65
-2
lines changed

6 files changed

+65
-2
lines changed

bindings/kafka/metadata.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,3 +352,12 @@ metadata:
352352
It allows sending headers with special characters that are usually not allowed in HTTP headers.
353353
example: "true"
354354
default: "false"
355+
- name: compression
356+
type: string
357+
required: false
358+
description: |
359+
Enables message compression.
360+
There are five types of compression available: none, gzip, snappy, lz4, and zstd.
361+
The default is none.
362+
example: '"gzip"'
363+
default: "none"

common/component/kafka/kafka.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
154154
config.Consumer.Group.Session.Timeout = meta.SessionTimeout
155155
config.ChannelBufferSize = meta.channelBufferSize
156156

157+
config.Producer.Compression = meta.internalCompression
158+
157159
config.Net.KeepAlive = meta.ClientConnectionKeepAliveInterval
158160
config.Metadata.RefreshFrequency = meta.ClientConnectionTopicMetadataRefreshInterval
159161

common/component/kafka/metadata.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ const (
3838
certificateAuthType = "certificate"
3939
clientCert = "clientCert"
4040
clientKey = "clientKey"
41-
consumeRetryEnabled = "consumeRetryEnabled"
4241
consumeRetryInterval = "consumeRetryInterval"
4342
authType = "authType"
4443
passwordAuthType = "password"
@@ -50,6 +49,7 @@ const (
5049
consumerFetchDefault = "consumerFetchDefault"
5150
channelBufferSize = "channelBufferSize"
5251
valueSchemaType = "valueSchemaType"
52+
compression = "compression"
5353

5454
// Kafka client config default values.
5555
// Refresh interval < keep alive time so that way connection can be kept alive indefinitely if desired.
@@ -102,6 +102,10 @@ type KafkaMetadata struct {
102102
consumerFetchMin int32 `mapstructure:"-"`
103103
consumerFetchDefault int32 `mapstructure:"-"`
104104

105+
// configs for kafka producer
106+
Compression string `mapstructure:"compression"`
107+
internalCompression sarama.CompressionCodec `mapstructure:"-"`
108+
105109
// schema registry
106110
SchemaRegistryURL string `mapstructure:"schemaRegistryURL"`
107111
SchemaRegistryAPIKey string `mapstructure:"schemaRegistryAPIKey"`
@@ -149,6 +153,7 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
149153
ConsumeRetryEnabled: k.DefaultConsumeRetryEnabled,
150154
ConsumeRetryInterval: 100 * time.Millisecond,
151155
internalVersion: sarama.V2_0_0_0, //nolint:nosnakecase
156+
internalCompression: sarama.CompressionNone,
152157
channelBufferSize: 256,
153158
consumerFetchMin: 1,
154159
consumerFetchDefault: 1024 * 1024,
@@ -294,6 +299,14 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
294299
m.internalVersion = version
295300
}
296301

302+
if m.Compression != "" {
303+
compression, err := parseCompression(m.Compression)
304+
if err != nil {
305+
return nil, err
306+
}
307+
m.internalCompression = compression
308+
}
309+
297310
if val, ok := meta[channelBufferSize]; ok && val != "" {
298311
v, err := strconv.Atoi(val)
299312
if err != nil {

common/component/kafka/metadata_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,18 +397,21 @@ func TestMetadataProducerValues(t *testing.T) {
397397
require.NoError(t, err)
398398
require.Equal(t, defaultClientConnectionTopicMetadataRefreshInterval, meta.ClientConnectionTopicMetadataRefreshInterval)
399399
require.Equal(t, defaultClientConnectionKeepAliveInterval, meta.ClientConnectionKeepAliveInterval)
400+
require.Equal(t, sarama.CompressionNone, meta.internalCompression)
400401
})
401402

402403
t.Run("setting producer values explicitly", func(t *testing.T) {
403404
k := getKafka()
404405
m := getCompleteMetadata()
405406
m[clientConnectionTopicMetadataRefreshInterval] = "3m0s"
406407
m[clientConnectionKeepAliveInterval] = "4m0s"
408+
m[compression] = "gzip"
407409

408410
meta, err := k.getKafkaMetadata(m)
409411
require.NoError(t, err)
410412
require.Equal(t, 3*time.Minute, meta.ClientConnectionTopicMetadataRefreshInterval)
411413
require.Equal(t, 4*time.Minute, meta.ClientConnectionKeepAliveInterval)
414+
require.Equal(t, sarama.CompressionGZIP, meta.internalCompression)
412415
})
413416

414417
t.Run("setting producer invalid values so defaults take over", func(t *testing.T) {
@@ -422,6 +425,17 @@ func TestMetadataProducerValues(t *testing.T) {
422425
require.Equal(t, defaultClientConnectionTopicMetadataRefreshInterval, meta.ClientConnectionTopicMetadataRefreshInterval)
423426
require.Equal(t, defaultClientConnectionKeepAliveInterval, meta.ClientConnectionKeepAliveInterval)
424427
})
428+
429+
t.Run("setting producer invalid compression value", func(t *testing.T) {
430+
k := getKafka()
431+
m := getCompleteMetadata()
432+
m[compression] = "invalid"
433+
434+
meta, err := k.getKafkaMetadata(m)
435+
require.Error(t, err)
436+
require.Nil(t, meta)
437+
require.Equal(t, "kafka error: invalid compression: invalid", err.Error())
438+
})
425439
}
426440

427441
func TestMetadataChannelBufferSize(t *testing.T) {

common/component/kafka/utils.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,22 @@ func parseInitialOffset(value string) (initialOffset int64, err error) {
5454
return initialOffset, err
5555
}
5656

57+
// parseCompression parses the compression codec from the given string.
58+
// If the string is empty, it returns the default compression codec.
59+
// If the string is not empty, it returns the parsed compression codec.
60+
// If the string is not empty and not a valid compression codec, it returns an error.
61+
// Supported compression codecs are: none, gzip, snappy, lz4, zstd.
62+
func parseCompression(value string) (compression sarama.CompressionCodec, err error) {
63+
compression = sarama.CompressionNone // Default
64+
if value != "" {
65+
unmarshalErr := compression.UnmarshalText([]byte(value))
66+
if unmarshalErr != nil {
67+
return sarama.CompressionNone, fmt.Errorf("kafka error: invalid compression: %s", value)
68+
}
69+
}
70+
return compression, err
71+
}
72+
5773
// isValidPEM validates the provided input has PEM formatted block.
5874
func isValidPEM(val string) bool {
5975
block, _ := pem.Decode([]byte(val))
@@ -64,7 +80,7 @@ func isValidPEM(val string) bool {
6480
// TopicHandlerConfig is the map of topics and sruct containing handler and their config.
6581
type TopicHandlerConfig map[string]SubscriptionHandlerConfig
6682

67-
// // TopicList returns the list of topics
83+
// TopicList returns the list of topics
6884
func (tbh TopicHandlerConfig) TopicList() []string {
6985
topics := make([]string, len(tbh))
7086
i := 0

pubsub/kafka/metadata.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,3 +343,12 @@ metadata:
343343
It allows sending headers with special characters that are usually not allowed in HTTP headers.
344344
example: "true"
345345
default: "false"
346+
- name: compression
347+
type: string
348+
required: false
349+
description: |
350+
Enables message compression.
351+
There are five types of compression available: none, gzip, snappy, lz4, and zstd.
352+
The default is none.
353+
example: '"gzip"'
354+
default: "none"

0 commit comments

Comments
 (0)