Skip to content

Commit 34aace0

Browse files
marclopaxw
andauthored
kafkaexporter: add producer.linger config option (open-telemetry#44076)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Adds a new configuration option to the Kafka exporter to control the linger time for the producer. Since `franz-go` now defaults to `10ms`, it's best to allow users to configure this option to suit their needs. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#44075 <!--Describe what testing was performed and which tests were added.--> #### Testing Unit tested <!--Describe the documentation added.--> #### Documentation Docs addded. --------- Signed-off-by: Marc Lopez Rubio <[email protected]> Co-authored-by: Andrew Wilkins <[email protected]>
1 parent 8c21bab commit 34aace0

File tree

6 files changed

+62
-0
lines changed

6 files changed

+62
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: exporter/kafka
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Adds a new configuration option to the Kafka exporter to control the linger time for the producer.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [44075]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Since `franz-go` now defaults to `10ms`, it's best to allow users to configure this option to suit their needs.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/kafkaexporter/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ The following settings can be optionally configured:
122122
No compression levels supported yet
123123
- `flush_max_messages` (default = 0) The maximum number of messages the producer will send in a single broker request.
124124
- `allow_auto_topic_creation` (default = true) whether the broker is allowed to automatically create topics when they are referenced but do not already exist.
125+
- `linger`: (default = `10ms`) How long individual topic partitions will linger waiting for more records before triggering a request to be built.
125126

126127
### Supported encodings
127128

internal/kafka/franz_client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func NewFranzSyncProducer(ctx context.Context, clientCfg configkafka.ClientConfi
5858
// the legacy compatibility sarama hashing to avoid hashing to different
5959
// partitions in case partitioning is enabled.
6060
kgo.RecordPartitioner(newSaramaCompatPartitioner()),
61+
kgo.ProducerLinger(cfg.Linger),
6162
)...)
6263
if err != nil {
6364
return nil, err

pkg/kafka/configkafka/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,10 @@ type ProducerConfig struct {
219219
// Whether or not to allow automatic topic creation.
220220
// (default enabled).
221221
AllowAutoTopicCreation bool `mapstructure:"allow_auto_topic_creation"`
222+
223+
// Linger controls the linger time for the producer.
224+
// (default 10ms).
225+
Linger time.Duration `mapstructure:"linger"`
222226
}
223227

224228
func NewDefaultProducerConfig() ProducerConfig {
@@ -228,6 +232,7 @@ func NewDefaultProducerConfig() ProducerConfig {
228232
Compression: "none",
229233
FlushMaxMessages: 0,
230234
AllowAutoTopicCreation: true,
235+
Linger: 10 * time.Millisecond,
231236
}
232237
}
233238

pkg/kafka/configkafka/config_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ func TestProducerConfig(t *testing.T) {
193193
},
194194
FlushMaxMessages: 2,
195195
AllowAutoTopicCreation: true,
196+
Linger: 10 * time.Millisecond,
196197
},
197198
},
198199
"default_compression_level": {
@@ -206,6 +207,7 @@ func TestProducerConfig(t *testing.T) {
206207
},
207208
FlushMaxMessages: 2,
208209
AllowAutoTopicCreation: true,
210+
Linger: 10 * time.Millisecond,
209211
},
210212
},
211213
"snappy_compression": {
@@ -214,6 +216,7 @@ func TestProducerConfig(t *testing.T) {
214216
RequiredAcks: 1,
215217
Compression: "snappy",
216218
AllowAutoTopicCreation: true,
219+
Linger: 10 * time.Millisecond,
217220
},
218221
},
219222
"disable_auto_topic_creation": {
@@ -222,6 +225,25 @@ func TestProducerConfig(t *testing.T) {
222225
RequiredAcks: 1,
223226
Compression: "none",
224227
AllowAutoTopicCreation: false,
228+
Linger: 10 * time.Millisecond,
229+
},
230+
},
231+
"producer_linger": {
232+
expected: ProducerConfig{
233+
MaxMessageBytes: 1000000,
234+
RequiredAcks: 1,
235+
Compression: "none",
236+
AllowAutoTopicCreation: true,
237+
Linger: 100 * time.Millisecond,
238+
},
239+
},
240+
"producer_linger_1s": {
241+
expected: ProducerConfig{
242+
MaxMessageBytes: 1000000,
243+
RequiredAcks: 1,
244+
Compression: "none",
245+
AllowAutoTopicCreation: true,
246+
Linger: 1 * time.Second,
225247
},
226248
},
227249
"invalid_compression_level": {

pkg/kafka/configkafka/testdata/producer_config.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,9 @@ kafka/invalid_compression:
3131
compression: brotli
3232
kafka/invalid_required_acks:
3333
required_acks: 3
34+
35+
kafka/producer_linger:
36+
linger: 100ms
37+
38+
kafka/producer_linger_1s:
39+
linger: 1s

0 commit comments

Comments
 (0)