diff --git a/changelog/fragments/1775074566-fix-processing-time-metric.yaml b/changelog/fragments/1775074566-fix-processing-time-metric.yaml new file mode 100644 index 000000000000..9bfa61d6de75 --- /dev/null +++ b/changelog/fragments/1775074566-fix-processing-time-metric.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Fix internal processing time metric for azureeventhub input. + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: filebeat + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/beats/pull/40547 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/x-pack/filebeat/input/azureeventhub/v2_input.go b/x-pack/filebeat/input/azureeventhub/v2_input.go index fd5577aa2f6b..35b4f376a3c7 100644 --- a/x-pack/filebeat/input/azureeventhub/v2_input.go +++ b/x-pack/filebeat/input/azureeventhub/v2_input.go @@ -561,9 +561,11 @@ func (in *eventHubInputV2) processReceivedEvents(receivedEvents []*azeventhubs.R // Update input metrics. in.metrics.processedMessages.Inc() - in.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds()) } + // Update input metrics. + in.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds()) + return nil } diff --git a/x-pack/filebeat/input/azureeventhub/v2_input_test.go b/x-pack/filebeat/input/azureeventhub/v2_input_test.go index 6caba05a40a8..365ee5f81d97 100644 --- a/x-pack/filebeat/input/azureeventhub/v2_input_test.go +++ b/x-pack/filebeat/input/azureeventhub/v2_input_test.go @@ -11,18 +11,21 @@ import ( "errors" "sync" "testing" + "time" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" inputv2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/management/status" - "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/monitoring" ) func TestRunUpdatesStatusToStartingAndFailed(t *testing.T) { - input, err := newEventHubInputV2(azureInputConfig{}, logp.NewLogger(inputName)) + logger := logptest.NewTestingLogger(t, inputName) + input, err := newEventHubInputV2(azureInputConfig{}, logger) require.NoError(t, err) eventHubInputV2, ok := input.(*eventHubInputV2) @@ -38,7 +41,7 @@ func TestRunUpdatesStatusToStartingAndFailed(t *testing.T) { statusReporter := newMockStatusReporter() inputTestCtx := inputv2.Context{ - Logger: logp.NewLogger(inputName), + Logger: logger, Cancelation: ctx, MetricsRegistry: monitoring.NewRegistry(), } @@ -55,6 +58,75 @@ func TestRunUpdatesStatusToStartingAndFailed(t *testing.T) { assert.Equal(t, status.Failed, statusReporter.statuses[2]) } +func TestProcessReceivedEventsUpdatesProcessingTimeOnce(t *testing.T) { + // This test verifies that processingTime is updated exactly once + // per call to processReceivedEvents, regardless of the number of + // events processed. Before the fix, processingTime was updated + // inside the loop, resulting in N updates for N events. + + inputConfig := azureInputConfig{ + EventHubName: "test-eventhub", + ConsumerGroup: "test-consumer-group", + } + + logger := logptest.NewTestingLogger(t, inputName) + metrics := newInputMetrics(monitoring.NewRegistry(), logger) + + sanitizers, err := newSanitizers(inputConfig.Sanitizers, inputConfig.LegacySanitizeOptions) + require.NoError(t, err) + + input := &eventHubInputV2{ + config: inputConfig, + log: logger, + metrics: metrics, + messageDecoder: messageDecoder{ + config: inputConfig, + metrics: metrics, + log: logger, + sanitizers: sanitizers, + }, + } + + now := time.Now() + partitionKey := "test-key" + + // Create multiple received events so we can verify + // processingTime is updated once, not per event. + receivedEvents := []*azeventhubs.ReceivedEventData{ + { + EventData: azeventhubs.EventData{Body: []byte(`{"records":[{"msg":"one"}]}`)}, + EnqueuedTime: &now, + PartitionKey: &partitionKey, + Offset: 0, + }, + { + EventData: azeventhubs.EventData{Body: []byte(`{"records":[{"msg":"two"}]}`)}, + EnqueuedTime: &now, + PartitionKey: &partitionKey, + Offset: 1, + }, + { + EventData: azeventhubs.EventData{Body: []byte(`{"records":[{"msg":"three"}]}`)}, + EnqueuedTime: &now, + PartitionKey: &partitionKey, + Offset: 2, + }, + } + + fakeClient := &fakeClient{} + + err = input.processReceivedEvents(receivedEvents, "0", fakeClient) + require.NoError(t, err) + + // Verify all 3 messages were processed. + assert.Equal(t, uint64(3), metrics.processedMessages.Get()) + + // Verify processingTime was updated exactly once (after the loop), + // not once per event (which was the bug). + assert.Equal(t, 1, metrics.processingTime.Size(), + "processingTime should be updated once per call, not once per event") +} + // mockStatusReporter is a mock implementation of the status.Reporter interface. // It is used to verify that the input updates its status correctly. type mockStatusReporter struct {