Skip to content

Commit 323e9bf

Browse files
authored
Add kafka configuration to pub/sub (dapr#3275)
Signed-off-by: yaron2 <[email protected]>
1 parent 425f9fc commit 323e9bf

File tree

4 files changed

+90
-5
lines changed

4 files changed

+90
-5
lines changed

common/component/kafka/kafka.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
7878
config := sarama.NewConfig()
7979
config.Version = meta.internalVersion
8080
config.Consumer.Offsets.Initial = k.initialOffset
81+
config.Consumer.Fetch.Min = meta.consumerFetchMin
82+
config.Consumer.Fetch.Default = meta.consumerFetchDefault
83+
config.ChannelBufferSize = meta.channelBufferSize
8184

8285
if meta.ClientID != "" {
8386
config.ClientID = meta.ClientID

common/component/kafka/metadata.go

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ const (
4141
mtlsAuthType = "mtls"
4242
awsIAMAuthType = "awsiam"
4343
noAuthType = "none"
44+
consumerFetchMin = "consumerFetchMin"
45+
consumerFetchDefault = "consumerFetchDefault"
46+
channelBufferSize = "channelBufferSize"
4447
)
4548

4649
type KafkaMetadata struct {
@@ -72,10 +75,13 @@ type KafkaMetadata struct {
7275
internalVersion sarama.KafkaVersion `mapstructure:"-"`
7376
internalOidcExtensions map[string]string `mapstructure:"-"`
7477
// aws iam auth profile
75-
AWSAccessKey string `mapstructure:"awsAccessKey"`
76-
AWSSecretKey string `mapstructure:"awsSecretKey"`
77-
AWSSessionToken string `mapstructure:"awsSessionToken"`
78-
AWSRegion string `mapstructure:"awsRegion"`
78+
AWSAccessKey string `mapstructure:"awsAccessKey"`
79+
AWSSecretKey string `mapstructure:"awsSecretKey"`
80+
AWSSessionToken string `mapstructure:"awsSessionToken"`
81+
AWSRegion string `mapstructure:"awsRegion"`
82+
channelBufferSize int `mapstructure:"-"`
83+
consumerFetchMin int32 `mapstructure:"-"`
84+
consumerFetchDefault int32 `mapstructure:"-"`
7985
}
8086

8187
// upgradeMetadata updates metadata properties based on deprecated usage.
@@ -119,6 +125,9 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
119125
m := KafkaMetadata{
120126
ConsumeRetryInterval: 100 * time.Millisecond,
121127
internalVersion: sarama.V2_0_0_0, //nolint:nosnakecase
128+
channelBufferSize: 256,
129+
consumerFetchMin: 1,
130+
consumerFetchDefault: 1024 * 1024,
122131
}
123132

124133
err := metadata.DecodeMetadata(meta, &m)
@@ -257,5 +266,32 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
257266
m.internalVersion = version
258267
}
259268

269+
if val, ok := meta[channelBufferSize]; ok && val != "" {
270+
v, err := strconv.Atoi(val)
271+
if err != nil {
272+
return nil, err
273+
}
274+
275+
m.channelBufferSize = v
276+
}
277+
278+
if val, ok := meta[consumerFetchDefault]; ok && val != "" {
279+
v, err := strconv.ParseInt(val, 10, 32)
280+
if err != nil {
281+
return nil, err
282+
}
283+
284+
m.consumerFetchDefault = int32(v)
285+
}
286+
287+
if val, ok := meta[consumerFetchMin]; ok && val != "" {
288+
v, err := strconv.ParseInt(val, 10, 32)
289+
if err != nil {
290+
return nil, err
291+
}
292+
293+
m.consumerFetchMin = int32(v)
294+
}
295+
260296
return &m, nil
261297
}

common/component/kafka/metadata_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ func getCompleteMetadata() map[string]string {
5757
clientKey: clientKeyMock,
5858
caCert: caCertMock,
5959
"consumeRetryInterval": "200",
60+
consumerFetchDefault: "1048576",
61+
consumerFetchMin: "1",
62+
channelBufferSize: "256",
6063
}
6164
}
6265

@@ -125,6 +128,9 @@ func assertMetadata(t *testing.T, meta *KafkaMetadata) {
125128
require.Equal(t, clientKeyMock, meta.TLSClientKey)
126129
require.Equal(t, caCertMock, meta.TLSCaCert)
127130
require.Equal(t, 200*time.Millisecond, meta.ConsumeRetryInterval)
131+
require.Equal(t, int32(1024*1024), meta.consumerFetchDefault)
132+
require.Equal(t, int32(1), meta.consumerFetchMin)
133+
require.Equal(t, 256, meta.channelBufferSize)
128134
}
129135

130136
func TestMissingBrokers(t *testing.T) {
@@ -361,3 +367,25 @@ func TestTls(t *testing.T) {
361367
require.Equal(t, "missing CA certificate property 'caCert' for authType 'certificate'", err.Error())
362368
})
363369
}
370+
371+
func TestMetadataConsumerFetchValues(t *testing.T) {
372+
k := getKafka()
373+
m := getCompleteMetadata()
374+
m["consumerFetchMin"] = "3"
375+
m["consumerFetchDefault"] = "2048"
376+
377+
meta, err := k.getKafkaMetadata(m)
378+
require.NoError(t, err)
379+
require.Equal(t, int32(3), meta.consumerFetchMin)
380+
require.Equal(t, int32(2048), meta.consumerFetchDefault)
381+
}
382+
383+
func TestMetadataChannelBufferSize(t *testing.T) {
384+
k := getKafka()
385+
m := getCompleteMetadata()
386+
m["channelBufferSize"] = "128"
387+
388+
meta, err := k.getKafkaMetadata(m)
389+
require.NoError(t, err)
390+
require.Equal(t, 128, meta.channelBufferSize)
391+
}

pubsub/kafka/metadata.yaml

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,4 +241,22 @@ metadata:
241241
This is potentially insecure and not recommended for use in production.
242242
example: "true"
243243
default: "false"
244-
type: bool
244+
type: bool
245+
- name: channelBufferSize
246+
type: number
247+
description: |
248+
The number of events to buffer in internal and external channels.
249+
example: '128'
250+
default: '256'
251+
- name: consumerFetchMin
252+
type: number
253+
description: |
254+
The minimum number of message bytes to fetch in a request.
255+
example: '4'
256+
default: '1'
257+
- name: consumerFetchDefault
258+
type: number
259+
description: |
260+
The default number of message bytes to fetch from the broker in each request.
261+
example: '2097152'
262+
default: '1048576'

0 commit comments

Comments
 (0)