Skip to content

Commit eb53837

Browse files
zmoogclaude
authored andcommitted
azure-eventhub v2: fix processingTime input metric (#49864)
* azure-eventhub v2: fix processingTime input metric Move the processingTime metric update outside the loop in processReceivedEvents so it is recorded once per batch instead of once per event. Previously, because processingStartTime was set before the loop and never reset, each iteration recorded a cumulative duration: ~T, ~2T, ~3T, etc. This inflated the histogram sample count and skewed percentiles. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Update changelog * Fix forbidigo lint: replace logp.NewLogger with logp.L() in tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Switch to logptest --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> (cherry picked from commit 19da895)
1 parent 84140f0 commit eb53837

File tree

3 files changed

+110
-4
lines changed

3 files changed

+110
-4
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: bug-fix
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Fix internal processing time metric for azureeventhub input.
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: filebeat
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
pr: https://github.com/elastic/beats/pull/40547
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
#issue: https://github.com/owner/repo/1234

x-pack/filebeat/input/azureeventhub/v2_input.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,9 +561,11 @@ func (in *eventHubInputV2) processReceivedEvents(receivedEvents []*azeventhubs.R
561561

562562
// Update input metrics.
563563
in.metrics.processedMessages.Inc()
564-
in.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds())
565564
}
566565

566+
// Update input metrics.
567+
in.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds())
568+
567569
return nil
568570
}
569571

x-pack/filebeat/input/azureeventhub/v2_input_test.go

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,21 @@ import (
1111
"errors"
1212
"sync"
1313
"testing"
14+
"time"
1415

16+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
1517
"github.com/stretchr/testify/assert"
1618
"github.com/stretchr/testify/require"
1719

1820
inputv2 "github.com/elastic/beats/v7/filebeat/input/v2"
1921
"github.com/elastic/beats/v7/libbeat/management/status"
20-
"github.com/elastic/elastic-agent-libs/logp"
22+
"github.com/elastic/elastic-agent-libs/logp/logptest"
2123
"github.com/elastic/elastic-agent-libs/monitoring"
2224
)
2325

2426
func TestRunUpdatesStatusToStartingAndFailed(t *testing.T) {
25-
input, err := newEventHubInputV2(azureInputConfig{}, logp.NewLogger(inputName))
27+
logger := logptest.NewTestingLogger(t, inputName)
28+
input, err := newEventHubInputV2(azureInputConfig{}, logger)
2629
require.NoError(t, err)
2730

2831
eventHubInputV2, ok := input.(*eventHubInputV2)
@@ -38,7 +41,7 @@ func TestRunUpdatesStatusToStartingAndFailed(t *testing.T) {
3841

3942
statusReporter := newMockStatusReporter()
4043
inputTestCtx := inputv2.Context{
41-
Logger: logp.NewLogger(inputName),
44+
Logger: logger,
4245
Cancelation: ctx,
4346
MetricsRegistry: monitoring.NewRegistry(),
4447
}
@@ -55,6 +58,75 @@ func TestRunUpdatesStatusToStartingAndFailed(t *testing.T) {
5558
assert.Equal(t, status.Failed, statusReporter.statuses[2])
5659
}
5760

61+
func TestProcessReceivedEventsUpdatesProcessingTimeOnce(t *testing.T) {
62+
// This test verifies that processingTime is updated exactly once
63+
// per call to processReceivedEvents, regardless of the number of
64+
// events processed. Before the fix, processingTime was updated
65+
// inside the loop, resulting in N updates for N events.
66+
67+
inputConfig := azureInputConfig{
68+
EventHubName: "test-eventhub",
69+
ConsumerGroup: "test-consumer-group",
70+
}
71+
72+
logger := logptest.NewTestingLogger(t, inputName)
73+
metrics := newInputMetrics(monitoring.NewRegistry(), logger)
74+
75+
sanitizers, err := newSanitizers(inputConfig.Sanitizers, inputConfig.LegacySanitizeOptions)
76+
require.NoError(t, err)
77+
78+
input := &eventHubInputV2{
79+
config: inputConfig,
80+
log: logger,
81+
metrics: metrics,
82+
messageDecoder: messageDecoder{
83+
config: inputConfig,
84+
metrics: metrics,
85+
log: logger,
86+
sanitizers: sanitizers,
87+
},
88+
}
89+
90+
now := time.Now()
91+
partitionKey := "test-key"
92+
93+
// Create multiple received events so we can verify
94+
// processingTime is updated once, not per event.
95+
receivedEvents := []*azeventhubs.ReceivedEventData{
96+
{
97+
EventData: azeventhubs.EventData{Body: []byte(`{"records":[{"msg":"one"}]}`)},
98+
EnqueuedTime: &now,
99+
PartitionKey: &partitionKey,
100+
Offset: 0,
101+
},
102+
{
103+
EventData: azeventhubs.EventData{Body: []byte(`{"records":[{"msg":"two"}]}`)},
104+
EnqueuedTime: &now,
105+
PartitionKey: &partitionKey,
106+
Offset: 1,
107+
},
108+
{
109+
EventData: azeventhubs.EventData{Body: []byte(`{"records":[{"msg":"three"}]}`)},
110+
EnqueuedTime: &now,
111+
PartitionKey: &partitionKey,
112+
Offset: 2,
113+
},
114+
}
115+
116+
fakeClient := &fakeClient{}
117+
118+
err = input.processReceivedEvents(receivedEvents, "0", fakeClient)
119+
require.NoError(t, err)
120+
121+
// Verify all 3 messages were processed.
122+
assert.Equal(t, uint64(3), metrics.processedMessages.Get())
123+
124+
// Verify processingTime was updated exactly once (after the loop),
125+
// not once per event (which was the bug).
126+
assert.Equal(t, 1, metrics.processingTime.Size(),
127+
"processingTime should be updated once per call, not once per event")
128+
}
129+
58130
// mockStatusReporter is a mock implementation of the status.Reporter interface.
59131
// It is used to verify that the input updates its status correctly.
60132
type mockStatusReporter struct {

0 commit comments

Comments
 (0)