Skip to content

Commit 5fe038d

Browse files
authored
kafka: expose batching configuration (#3621)
1 parent 1b2eec9 commit 5fe038d

File tree

1 file changed

+31
-6
lines changed

1 file changed

+31
-6
lines changed

pkg/acquisition/modules/kafka/kafka.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ var linesRead = prometheus.NewCounterVec(
3333
[]string{"topic"})
3434

3535
type KafkaConfiguration struct {
36-
Brokers []string `yaml:"brokers"`
37-
Topic string `yaml:"topic"`
38-
GroupID string `yaml:"group_id"`
39-
Partition int `yaml:"partition"`
40-
Timeout string `yaml:"timeout"`
41-
TLS *TLSConfig `yaml:"tls"`
36+
Brokers []string `yaml:"brokers"`
37+
Topic string `yaml:"topic"`
38+
GroupID string `yaml:"group_id"`
39+
Partition int `yaml:"partition"`
40+
Timeout string `yaml:"timeout"`
41+
TLS *TLSConfig `yaml:"tls"`
42+
BatchConfiguration KafkaBatchConfiguration `yaml:"batch"`
4243
configuration.DataSourceCommonCfg `yaml:",inline"`
4344
}
4445

@@ -49,6 +50,14 @@ type TLSConfig struct {
4950
CaCert string `yaml:"ca_cert"`
5051
}
5152

53+
type KafkaBatchConfiguration struct {
54+
BatchMinBytes int `yaml:"min_bytes"`
55+
BatchMaxBytes int `yaml:"max_bytes"`
56+
BatchMaxWait time.Duration `yaml:"max_wait"`
57+
BatchQueueSize int `yaml:"queue_size"`
58+
CommitInterval time.Duration `yaml:"commit_interval"`
59+
}
60+
5261
type KafkaSource struct {
5362
metricsLevel int
5463
Config KafkaConfiguration
@@ -294,6 +303,22 @@ func (kc *KafkaConfiguration) NewReader(dialer *kafka.Dialer, logger *log.Entry)
294303
logger.Warnf("no group_id specified, crowdsec will only read from the 1st partition of the topic")
295304
}
296305

306+
if kc.BatchConfiguration.BatchMinBytes != 0 {
307+
rConf.MinBytes = kc.BatchConfiguration.BatchMinBytes
308+
}
309+
if kc.BatchConfiguration.BatchMaxBytes != 0 {
310+
rConf.MaxBytes = kc.BatchConfiguration.BatchMaxBytes
311+
}
312+
if kc.BatchConfiguration.BatchMaxWait != 0 {
313+
rConf.MaxWait = kc.BatchConfiguration.BatchMaxWait
314+
}
315+
if kc.BatchConfiguration.BatchQueueSize != 0 {
316+
rConf.QueueCapacity = kc.BatchConfiguration.BatchQueueSize
317+
}
318+
if kc.BatchConfiguration.CommitInterval != 0 {
319+
rConf.CommitInterval = kc.BatchConfiguration.CommitInterval
320+
}
321+
297322
if err := rConf.Validate(); err != nil {
298323
return &kafka.Reader{}, fmt.Errorf("while validating reader configuration: %w", err)
299324
}

0 commit comments

Comments
 (0)