Skip to content

Commit dedc4ea

Browse files
authored
[exporter/kafka] franz-go: Fix underreported kafka_exporter_write_latency metric (open-telemetry#43859)
#### Description franz-go: Fix underreported kafka_exporter_write_latency metric by using OnBrokerE2E franz-go hook that takes response read time into account <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#43803 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 590a226 commit dedc4ea

File tree

3 files changed

+60
-7
lines changed

3 files changed

+60
-7
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: exporter/kafka
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "franz-go: Fix underreported kafka_exporter_write_latency metric"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [43803]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

exporter/kafkaexporter/internal/kafkaclient/franzgo_metrics.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ func NewFranzProducerMetrics(tb *metadata.TelemetryBuilder) FranzProducerMetrics
2626
return FranzProducerMetrics{tb: tb}
2727
}
2828

29+
var _ kgo.HookBrokerConnect = FranzProducerMetrics{}
30+
2931
func (fpm FranzProducerMetrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
3032
outcome := "success"
3133
if err != nil {
@@ -41,6 +43,8 @@ func (fpm FranzProducerMetrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.
4143
)
4244
}
4345

46+
var _ kgo.HookBrokerDisconnect = FranzProducerMetrics{}
47+
4448
func (fpm FranzProducerMetrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
4549
fpm.tb.KafkaBrokerClosed.Add(
4650
context.Background(),
@@ -51,6 +55,8 @@ func (fpm FranzProducerMetrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ ne
5155
)
5256
}
5357

58+
var _ kgo.HookBrokerThrottle = FranzProducerMetrics{}
59+
5460
func (fpm FranzProducerMetrics) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) {
5561
// KafkaBrokerThrottlingDuration is deprecated in favor of KafkaBrokerThrottlingLatency.
5662
fpm.tb.KafkaBrokerThrottlingDuration.Record(
@@ -69,30 +75,34 @@ func (fpm FranzProducerMetrics) OnBrokerThrottle(meta kgo.BrokerMetadata, thrott
6975
)
7076
}
7177

72-
func (fpm FranzProducerMetrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, _ int, writeWait, timeToWrite time.Duration, err error) {
78+
var _ kgo.HookBrokerE2E = FranzProducerMetrics{}
79+
80+
func (fpm FranzProducerMetrics) OnBrokerE2E(meta kgo.BrokerMetadata, _ int16, e2e kgo.BrokerE2E) {
7381
outcome := "success"
74-
if err != nil {
82+
if e2e.Err() != nil {
7583
outcome = "failure"
7684
}
7785
// KafkaExporterLatency is deprecated in favor of KafkaExporterWriteLatency.
7886
fpm.tb.KafkaExporterLatency.Record(
7987
context.Background(),
80-
writeWait.Milliseconds()+timeToWrite.Milliseconds(),
88+
e2e.DurationE2E().Milliseconds()+e2e.WriteWait.Milliseconds(),
8189
metric.WithAttributes(
8290
attribute.String("node_id", kgo.NodeName(meta.NodeID)),
8391
attribute.String("outcome", outcome),
8492
),
8593
)
8694
fpm.tb.KafkaExporterWriteLatency.Record(
8795
context.Background(),
88-
writeWait.Seconds()+timeToWrite.Seconds(),
96+
e2e.DurationE2E().Seconds()+e2e.WriteWait.Seconds(),
8997
metric.WithAttributes(
9098
attribute.String("node_id", kgo.NodeName(meta.NodeID)),
9199
attribute.String("outcome", outcome),
92100
),
93101
)
94102
}
95103

104+
var _ kgo.HookProduceBatchWritten = FranzProducerMetrics{}
105+
96106
// OnProduceBatchWritten is called when a batch has been produced.
97107
// https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#HookProduceBatchWritten
98108
func (fpm FranzProducerMetrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, partition int32, m kgo.ProduceBatchMetrics) {
@@ -126,6 +136,8 @@ func (fpm FranzProducerMetrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, t
126136
)
127137
}
128138

139+
var _ kgo.HookProduceRecordUnbuffered = FranzProducerMetrics{}
140+
129141
// OnProduceRecordUnbuffered records the number of produced messages that were
130142
// not produced due to errors. The successfully produced records is recorded by
131143
// `OnProduceBatchWritten`.

exporter/kafkaexporter/internal/kafkaclient/franzgo_metrics_test.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,28 @@ func TestFranzProducerMetrics(t *testing.T) {
7979
metricdatatest.IgnoreTimestamp(),
8080
)
8181
})
82-
t.Run("should report the metrics when OnBrokerWrite hook is called", func(t *testing.T) {
82+
t.Run("should report the metrics when OnBrokerE2E hook is called", func(t *testing.T) {
8383
testTel := componenttest.NewTelemetry()
8484
tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings())
8585
require.NoError(t, err)
8686
defer tb.Shutdown()
8787
fpm := NewFranzProducerMetrics(tb)
88-
fpm.OnBrokerWrite(kgo.BrokerMetadata{NodeID: 1}, 0, 0, time.Second/2, time.Second/2, nil)
89-
fpm.OnBrokerWrite(kgo.BrokerMetadata{NodeID: 1}, 0, 0, 100*time.Second, 0, errors.New(""))
88+
fpm.OnBrokerE2E(kgo.BrokerMetadata{NodeID: 1}, 0, kgo.BrokerE2E{
89+
WriteWait: time.Second / 4,
90+
TimeToWrite: time.Second / 4,
91+
ReadWait: time.Second / 4,
92+
TimeToRead: time.Second / 4,
93+
WriteErr: nil,
94+
ReadErr: nil,
95+
})
96+
fpm.OnBrokerE2E(kgo.BrokerMetadata{NodeID: 1}, 0, kgo.BrokerE2E{
97+
WriteWait: time.Second * 25,
98+
TimeToWrite: time.Second * 25,
99+
ReadWait: time.Second * 25,
100+
TimeToRead: time.Second * 25,
101+
WriteErr: errors.New(""),
102+
ReadErr: nil,
103+
})
90104
var rm metricdata.ResourceMetrics
91105
err = testTel.Reader.Collect(t.Context(), &rm)
92106
require.NoError(t, err)

0 commit comments

Comments
 (0)