From 00cc13bc3c9bb5ab746bc1ff041ea175cf2073a2 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Wed, 1 Apr 2026 22:14:07 +0200 Subject: [PATCH 1/4] azure-eventhub v2: fix processingTime input metric Move the processingTime metric update outside the loop in processReceivedEvents so it is recorded once per batch instead of once per event. Previously, because processingStartTime was set before the loop and never reset, each iteration recorded a cumulative duration: ~T, ~2T, ~3T, etc. This inflated the histogram sample count and skewed percentiles. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../filebeat/input/azureeventhub/v2_input.go | 4 +- .../input/azureeventhub/v2_input_test.go | 71 +++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/azureeventhub/v2_input.go b/x-pack/filebeat/input/azureeventhub/v2_input.go index fd5577aa2f6b..35b4f376a3c7 100644 --- a/x-pack/filebeat/input/azureeventhub/v2_input.go +++ b/x-pack/filebeat/input/azureeventhub/v2_input.go @@ -561,9 +561,11 @@ func (in *eventHubInputV2) processReceivedEvents(receivedEvents []*azeventhubs.R // Update input metrics. in.metrics.processedMessages.Inc() - in.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds()) } + // Update input metrics. + in.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds()) + return nil } diff --git a/x-pack/filebeat/input/azureeventhub/v2_input_test.go b/x-pack/filebeat/input/azureeventhub/v2_input_test.go index 6caba05a40a8..da2db5b6022f 100644 --- a/x-pack/filebeat/input/azureeventhub/v2_input_test.go +++ b/x-pack/filebeat/input/azureeventhub/v2_input_test.go @@ -11,7 +11,9 @@ import ( "errors" "sync" "testing" + "time" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -55,6 +57,75 @@ func TestRunUpdatesStatusToStartingAndFailed(t *testing.T) { assert.Equal(t, status.Failed, statusReporter.statuses[2]) } +func TestProcessReceivedEventsUpdatesProcessingTimeOnce(t *testing.T) { + // This test verifies that processingTime is updated exactly once + // per call to processReceivedEvents, regardless of the number of + // events processed. Before the fix, processingTime was updated + // inside the loop, resulting in N updates for N events. + + inputConfig := azureInputConfig{ + EventHubName: "test-eventhub", + ConsumerGroup: "test-consumer-group", + } + + log := logp.NewLogger(inputName) + metrics := newInputMetrics(monitoring.NewRegistry(), log) + + sanitizers, err := newSanitizers(inputConfig.Sanitizers, inputConfig.LegacySanitizeOptions) + require.NoError(t, err) + + input := &eventHubInputV2{ + config: inputConfig, + log: log, + metrics: metrics, + messageDecoder: messageDecoder{ + config: inputConfig, + metrics: metrics, + log: log, + sanitizers: sanitizers, + }, + } + + now := time.Now() + partitionKey := "test-key" + + // Create multiple received events so we can verify + // processingTime is updated once, not per event. + receivedEvents := []*azeventhubs.ReceivedEventData{ + { + EventData: azeventhubs.EventData{Body: []byte(`{"records":[{"msg":"one"}]}`)}, + EnqueuedTime: &now, + PartitionKey: &partitionKey, + Offset: 0, + }, + { + EventData: azeventhubs.EventData{Body: []byte(`{"records":[{"msg":"two"}]}`)}, + EnqueuedTime: &now, + PartitionKey: &partitionKey, + Offset: 1, + }, + { + EventData: azeventhubs.EventData{Body: []byte(`{"records":[{"msg":"three"}]}`)}, + EnqueuedTime: &now, + PartitionKey: &partitionKey, + Offset: 2, + }, + } + + fakeClient := &fakeClient{} + + err = input.processReceivedEvents(receivedEvents, "0", fakeClient) + require.NoError(t, err) + + // Verify all 3 messages were processed. + assert.Equal(t, uint64(3), metrics.processedMessages.Get()) + + // Verify processingTime was updated exactly once (after the loop), + // not once per event (which was the bug). + assert.Equal(t, 1, metrics.processingTime.Size(), + "processingTime should be updated once per call, not once per event") +} + // mockStatusReporter is a mock implementation of the status.Reporter interface. // It is used to verify that the input updates its status correctly. type mockStatusReporter struct { From 32e21ed9e1ffaeb4847ca90df0465d2e30e6e34a Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Wed, 1 Apr 2026 22:19:00 +0200 Subject: [PATCH 2/4] Update changelog --- ...1775074566-fix-processing-time-metric.yaml | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 changelog/fragments/1775074566-fix-processing-time-metric.yaml diff --git a/changelog/fragments/1775074566-fix-processing-time-metric.yaml b/changelog/fragments/1775074566-fix-processing-time-metric.yaml new file mode 100644 index 000000000000..9bfa61d6de75 --- /dev/null +++ b/changelog/fragments/1775074566-fix-processing-time-metric.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Fix internal processing time metric for azureeventhub input. + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: filebeat + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/beats/pull/40547 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 From e4d1bf5e190230ab3870cbbeeea1efeacc19b7c6 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Thu, 2 Apr 2026 09:45:05 +0200 Subject: [PATCH 3/4] Fix forbidigo lint: replace logp.NewLogger with logp.L() in tests Co-Authored-By: Claude Opus 4.6 (1M context) --- x-pack/filebeat/input/azureeventhub/v2_input_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/v2_input_test.go b/x-pack/filebeat/input/azureeventhub/v2_input_test.go index da2db5b6022f..538c4021bcb2 100644 --- a/x-pack/filebeat/input/azureeventhub/v2_input_test.go +++ b/x-pack/filebeat/input/azureeventhub/v2_input_test.go @@ -24,7 +24,7 @@ import ( ) func TestRunUpdatesStatusToStartingAndFailed(t *testing.T) { - input, err := newEventHubInputV2(azureInputConfig{}, logp.NewLogger(inputName)) + input, err := newEventHubInputV2(azureInputConfig{}, logp.L()) require.NoError(t, err) eventHubInputV2, ok := input.(*eventHubInputV2) @@ -40,7 +40,7 @@ func TestRunUpdatesStatusToStartingAndFailed(t *testing.T) { statusReporter := newMockStatusReporter() inputTestCtx := inputv2.Context{ - Logger: logp.NewLogger(inputName), + Logger: logp.L(), Cancelation: ctx, MetricsRegistry: monitoring.NewRegistry(), } @@ -68,7 +68,7 @@ func TestProcessReceivedEventsUpdatesProcessingTimeOnce(t *testing.T) { ConsumerGroup: "test-consumer-group", } - log := logp.NewLogger(inputName) + log := logp.L() metrics := newInputMetrics(monitoring.NewRegistry(), log) sanitizers, err := newSanitizers(inputConfig.Sanitizers, inputConfig.LegacySanitizeOptions) From da1a48855d246960762fddf47b0e26197ae6eca0 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Thu, 2 Apr 2026 11:28:13 +0200 Subject: [PATCH 4/4] Switch to logptest --- .../filebeat/input/azureeventhub/v2_input_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/v2_input_test.go b/x-pack/filebeat/input/azureeventhub/v2_input_test.go index 538c4021bcb2..365ee5f81d97 100644 --- a/x-pack/filebeat/input/azureeventhub/v2_input_test.go +++ b/x-pack/filebeat/input/azureeventhub/v2_input_test.go @@ -19,12 +19,13 @@ import ( inputv2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/management/status" - "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/monitoring" ) func TestRunUpdatesStatusToStartingAndFailed(t *testing.T) { - input, err := newEventHubInputV2(azureInputConfig{}, logp.L()) + logger := logptest.NewTestingLogger(t, inputName) + input, err := newEventHubInputV2(azureInputConfig{}, logger) require.NoError(t, err) eventHubInputV2, ok := input.(*eventHubInputV2) @@ -40,7 +41,7 @@ func TestRunUpdatesStatusToStartingAndFailed(t *testing.T) { statusReporter := newMockStatusReporter() inputTestCtx := inputv2.Context{ - Logger: logp.L(), + Logger: logger, Cancelation: ctx, MetricsRegistry: monitoring.NewRegistry(), } @@ -68,20 +69,20 @@ func TestProcessReceivedEventsUpdatesProcessingTimeOnce(t *testing.T) { ConsumerGroup: "test-consumer-group", } - log := logp.L() - metrics := newInputMetrics(monitoring.NewRegistry(), log) + logger := logptest.NewTestingLogger(t, inputName) + metrics := newInputMetrics(monitoring.NewRegistry(), logger) sanitizers, err := newSanitizers(inputConfig.Sanitizers, inputConfig.LegacySanitizeOptions) require.NoError(t, err) input := &eventHubInputV2{ config: inputConfig, - log: log, + log: logger, metrics: metrics, messageDecoder: messageDecoder{ config: inputConfig, metrics: metrics, - log: log, + log: logger, sanitizers: sanitizers, }, }