From d721bd43faf2c077bd2ceda907248e4d49b25d91 Mon Sep 17 00:00:00 2001 From: William Easton Date: Mon, 30 Mar 2026 08:47:54 -0500 Subject: [PATCH 1/2] perf: remove unnecessary allocations in hot-path processors Remove wasteful Clone() calls and defer allocations in docker metadata, kubernetes metadata, timestamp, and publisher/processing processors. Co-Authored-By: Claude Opus 4.6 (1M context) --- ...755443-remove-unnecessary-allocations.yaml | 5 + libbeat/beat/event_test.go | 2 +- .../add_docker_metadata.go | 2 +- .../add_docker_metadata_benchmark_test.go | 71 ++++ .../annotator_run_test.go | 343 ++++++++++++++++++ .../add_kubernetes_metadata/benchmark_test.go | 100 +++++ .../add_kubernetes_metadata/kubernetes.go | 34 +- libbeat/processors/timestamp/timestamp.go | 30 +- .../timestamp/timestamp_benchmark_test.go | 78 ++++ libbeat/publisher/processing/processors.go | 1 - 10 files changed, 638 insertions(+), 28 deletions(-) create mode 100644 changelog/fragments/1774755443-remove-unnecessary-allocations.yaml create mode 100644 libbeat/processors/add_docker_metadata/add_docker_metadata_benchmark_test.go create mode 100644 libbeat/processors/add_kubernetes_metadata/annotator_run_test.go create mode 100644 libbeat/processors/add_kubernetes_metadata/benchmark_test.go create mode 100644 libbeat/processors/timestamp/timestamp_benchmark_test.go diff --git a/changelog/fragments/1774755443-remove-unnecessary-allocations.yaml b/changelog/fragments/1774755443-remove-unnecessary-allocations.yaml new file mode 100644 index 000000000000..cf35c76488f4 --- /dev/null +++ b/changelog/fragments/1774755443-remove-unnecessary-allocations.yaml @@ -0,0 +1,5 @@ +kind: enhancement +summary: Remove unnecessary allocations in hot-path processors (docker metadata, kubernetes metadata, dissect, timestamp) +component: libbeat +pr: + - https://github.com/elastic/beats/pull/49761 diff --git a/libbeat/beat/event_test.go b/libbeat/beat/event_test.go index 3e6daa91e139..24ffb87dfa3e 100644 --- a/libbeat/beat/event_test.go +++ b/libbeat/beat/event_test.go @@ -129,7 +129,7 @@ func TestEvent(t *testing.T) { event := &Event{} require.NotPanics(t, func() { s := event.String() - require.JSONEq(t, `{"@metadata":{},"@timestamp":"0001-01-01T00:00:00Z"}`, s) + require.Equal(t, `{"@metadata":{},"@timestamp":"0001-01-01T00:00:00Z"}`, s) }) }) }) diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata.go b/libbeat/processors/add_docker_metadata/add_docker_metadata.go index c68dd93ed546..5b21307c6b06 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata.go @@ -218,7 +218,7 @@ func (d *addDockerMetadata) Run(event *beat.Event) (*beat.Event, error) { _, _ = meta.Put("container.id", container.ID) _, _ = meta.Put("container.image.name", container.Image) _, _ = meta.Put("container.name", container.Name) - event.Fields.DeepUpdate(meta.Clone()) + event.Fields.DeepUpdate(meta) } else { d.log.Debugf("Container not found: cid=%s", cid) } diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata_benchmark_test.go b/libbeat/processors/add_docker_metadata/add_docker_metadata_benchmark_test.go new file mode 100644 index 000000000000..0e835c7723d1 --- /dev/null +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata_benchmark_test.go @@ -0,0 +1,71 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build (linux || darwin || windows) && !integration + +package add_docker_metadata + +import ( + "testing" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-autodiscover/docker" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp/logptest" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +func BenchmarkAddDockerMetadata(b *testing.B) { + cfg, err := conf.NewConfigFrom(map[string]interface{}{ + "match_fields": []string{"container.id"}, + }) + if err != nil { + b.Fatal(err) + } + + p, err := buildDockerMetadataProcessor(logptest.NewTestingLogger(b, ""), cfg, MockWatcherFactory( + map[string]*docker.Container{ + "abc123": { + ID: "abc123def456", + Image: "myrepo/myimage:latest", + Name: "my-container", + Labels: map[string]string{ + "app": "myapp", + "version": "v1.2.3", + "env": "production", + }, + }, + }, nil)) + if err != nil { + b.Fatal(err) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + event := &beat.Event{ + Fields: mapstr.M{ + "container": mapstr.M{"id": "abc123"}, + "message": "test log line", + }, + } + _, err := p.Run(event) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/libbeat/processors/add_kubernetes_metadata/annotator_run_test.go b/libbeat/processors/add_kubernetes_metadata/annotator_run_test.go new file mode 100644 index 000000000000..ef0327d4ee96 --- /dev/null +++ b/libbeat/processors/add_kubernetes_metadata/annotator_run_test.go @@ -0,0 +1,343 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build linux || darwin || windows + +package add_kubernetes_metadata + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp/logptest" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// newAnnotatorForTest builds a kubernetesAnnotator with a pre-populated cache +// (no network calls). The matcher looks up events by "container.id". +func newAnnotatorForTest(t *testing.T, cacheKey string, meta mapstr.M) *kubernetesAnnotator { + t.Helper() + + cfg := config.MustNewConfigFrom(map[string]interface{}{ + "lookup_fields": []string{"container.id"}, + }) + matcher, err := NewFieldMatcher(*cfg, logptest.NewTestingLogger(t, "")) + require.NoError(t, err) + + processor := &kubernetesAnnotator{ + log: logptest.NewTestingLogger(t, selector), + cache: newCache(10 * time.Second), + matchers: &Matchers{ + matchers: []Matcher{matcher}, + }, + kubernetesAvailable: true, + } + processor.cache.set(cacheKey, meta) + return processor +} + +// baseEvent returns an event that will match cacheKey via container.id. +func baseEvent(containerID string) *beat.Event { + return &beat.Event{ + Fields: mapstr.M{ + "container": mapstr.M{ + "id": containerID, + }, + }, + } +} + +// TestAnnotatorRunFullContainerMetadata verifies the primary split behaviour: +// OCI container field gets id/runtime/image.name but NOT name or raw image; +// kubernetes field gets container.name but NOT id/runtime/image. +func TestAnnotatorRunFullContainerMetadata(t *testing.T) { + meta := mapstr.M{ + "kubernetes": mapstr.M{ + "pod": mapstr.M{"name": "mypod"}, + "container": mapstr.M{ + "name": "mycontainer", + "image": "myimage:latest", + "id": "abc123", + "runtime": "containerd", + }, + }, + } + processor := newAnnotatorForTest(t, "abc123", meta) + + event, err := processor.Run(baseEvent("abc123")) + require.NoError(t, err) + + // --- OCI container field --- + containerRaw, err := event.Fields.GetValue("container") + require.NoError(t, err, "event.Fields[\"container\"] must be set") + container, ok := containerRaw.(mapstr.M) + require.True(t, ok, "container must be a mapstr.M") + + assert.Equal(t, "abc123", container["id"], "container.id should be set") + assert.Equal(t, "containerd", container["runtime"], "container.runtime should be set") + + imageRaw, err := container.GetValue("image") + require.NoError(t, err, "container.image must be set") + imageMap, ok := imageRaw.(mapstr.M) + require.True(t, ok, "container.image must be a mapstr.M") + assert.Equal(t, "myimage:latest", imageMap["name"], "container.image.name should match original image value") + + assert.NotContains(t, container, "name", "container must NOT have a 'name' key") + _, hasRawImage := container["image"].(string) + assert.False(t, hasRawImage, "container.image must not be a raw string") + + // --- kubernetes field --- + k8sRaw, err := event.Fields.GetValue("kubernetes") + require.NoError(t, err, "event.Fields[\"kubernetes\"] must be set") + k8s, ok := k8sRaw.(mapstr.M) + require.True(t, ok) + + k8sContainerRaw, err := k8s.GetValue("container") + require.NoError(t, err, "kubernetes.container must be present") + k8sContainer, ok := k8sContainerRaw.(mapstr.M) + require.True(t, ok) + + assert.Equal(t, "mycontainer", k8sContainer["name"], "kubernetes.container.name should be kept") + assert.NotContains(t, k8sContainer, "id", "kubernetes.container must NOT have id") + assert.NotContains(t, k8sContainer, "runtime", "kubernetes.container must NOT have runtime") + assert.NotContains(t, k8sContainer, "image", "kubernetes.container must NOT have image") +} + +// TestAnnotatorRunContainerWithoutImage verifies that when there is no image in +// the metadata, the OCI container field has id and runtime but no image key. +func TestAnnotatorRunContainerWithoutImage(t *testing.T) { + meta := mapstr.M{ + "kubernetes": mapstr.M{ + "pod": mapstr.M{"name": "mypod"}, + "container": mapstr.M{ + "name": "mycontainer", + "id": "abc456", + "runtime": "docker", + }, + }, + } + processor := newAnnotatorForTest(t, "abc456", meta) + + event, err := processor.Run(baseEvent("abc456")) + require.NoError(t, err) + + containerRaw, err := event.Fields.GetValue("container") + require.NoError(t, err) + container, ok := containerRaw.(mapstr.M) + require.True(t, ok) + + assert.Equal(t, "abc456", container["id"]) + assert.Equal(t, "docker", container["runtime"]) + assert.NotContains(t, container, "image", "container must NOT have image key when no image in metadata") +} + +// TestAnnotatorRunContainerWithoutName verifies that missing container.name +// does not panic and the OCI container field still has id and image.name. +func TestAnnotatorRunContainerWithoutName(t *testing.T) { + meta := mapstr.M{ + "kubernetes": mapstr.M{ + "pod": mapstr.M{"name": "mypod"}, + "container": mapstr.M{ + "image": "busybox:latest", + "id": "abc789", + }, + }, + } + processor := newAnnotatorForTest(t, "abc789", meta) + + event, err := processor.Run(baseEvent("abc789")) + require.NoError(t, err) + + containerRaw, err := event.Fields.GetValue("container") + require.NoError(t, err) + container, ok := containerRaw.(mapstr.M) + require.True(t, ok) + + assert.Equal(t, "abc789", container["id"]) + imageRaw, err := container.GetValue("image") + require.NoError(t, err, "container.image must be set") + imageMap, ok := imageRaw.(mapstr.M) + require.True(t, ok) + assert.Equal(t, "busybox:latest", imageMap["name"]) +} + +// TestAnnotatorRunNoContainerSubMap verifies that when the metadata has no +// kubernetes.container key at all, the OCI container field is not created and +// the kubernetes field is correctly populated. +func TestAnnotatorRunNoContainerSubMap(t *testing.T) { + meta := mapstr.M{ + "kubernetes": mapstr.M{ + "pod": mapstr.M{ + "name": "mypod", + "uid": "uid-001", + }, + }, + } + + // Use pod.name as the lookup field since there's no container sub-map. + cfg := config.MustNewConfigFrom(map[string]interface{}{ + "lookup_fields": []string{"pod.name"}, + }) + matcher, err := NewFieldMatcher(*cfg, logptest.NewTestingLogger(t, "")) + require.NoError(t, err) + + processor := &kubernetesAnnotator{ + log: logptest.NewTestingLogger(t, selector), + cache: newCache(10 * time.Second), + matchers: &Matchers{ + matchers: []Matcher{matcher}, + }, + kubernetesAvailable: true, + } + processor.cache.set("mypod", meta) + + event, err := processor.Run(&beat.Event{ + Fields: mapstr.M{ + "pod": mapstr.M{"name": "mypod"}, + }, + }) + require.NoError(t, err) + + // OCI container field should NOT be set. + _, containerErr := event.Fields.GetValue("container") + assert.Error(t, containerErr, "event.Fields[\"container\"] must NOT be set when there is no kubernetes.container") + + // kubernetes field should be present and correct. + k8sRaw, err := event.Fields.GetValue("kubernetes") + require.NoError(t, err, "event.Fields[\"kubernetes\"] must be set") + k8s, ok := k8sRaw.(mapstr.M) + require.True(t, ok) + + podRaw, err := k8s.GetValue("pod") + require.NoError(t, err) + pod, ok := podRaw.(mapstr.M) + require.True(t, ok) + assert.Equal(t, "mypod", pod["name"]) +} + +// TestAnnotatorRunExtraContainerFieldsPreserved verifies that unknown extra +// fields in kubernetes.container are forwarded to the OCI container field. +func TestAnnotatorRunExtraContainerFieldsPreserved(t *testing.T) { + meta := mapstr.M{ + "kubernetes": mapstr.M{ + "pod": mapstr.M{"name": "mypod"}, + "container": mapstr.M{ + "name": "mycontainer", + "image": "myimage:v1", + "id": "xtra001", + "runtime": "containerd", + "custom_field": "extra", + }, + }, + } + processor := newAnnotatorForTest(t, "xtra001", meta) + + event, err := processor.Run(baseEvent("xtra001")) + require.NoError(t, err) + + containerRaw, err := event.Fields.GetValue("container") + require.NoError(t, err) + container, ok := containerRaw.(mapstr.M) + require.True(t, ok) + + assert.Equal(t, "extra", container["custom_field"], "extra container fields must be preserved in OCI container") +} + +// TestAnnotatorRunCacheNotMutated verifies that running the processor multiple +// times on different events does not mutate the cached metadata entry. +func TestAnnotatorRunCacheNotMutated(t *testing.T) { + originalMeta := mapstr.M{ + "kubernetes": mapstr.M{ + "pod": mapstr.M{"name": "mypod"}, + "container": mapstr.M{ + "name": "mycontainer", + "image": "myimage:v2", + "id": "cache001", + "runtime": "containerd", + }, + }, + } + processor := newAnnotatorForTest(t, "cache001", originalMeta) + + // Run three times. + for i := 0; i < 3; i++ { + _, err := processor.Run(baseEvent("cache001")) + require.NoError(t, err) + } + + // Inspect the cache directly. + cached := processor.cache.get("cache001") + require.NotNil(t, cached, "cache entry must still exist") + + k8sRaw, err := cached.GetValue("kubernetes") + require.NoError(t, err) + k8s, ok := k8sRaw.(mapstr.M) + require.True(t, ok) + + k8sContainerRaw, err := k8s.GetValue("container") + require.NoError(t, err, "kubernetes.container must still be in cache") + k8sContainer, ok := k8sContainerRaw.(mapstr.M) + require.True(t, ok) + + assert.Equal(t, "mycontainer", k8sContainer["name"], "cache must still have container.name") + assert.Equal(t, "myimage:v2", k8sContainer["image"], "cache must still have container.image as a raw string") + assert.Equal(t, "cache001", k8sContainer["id"], "cache must still have container.id") + assert.Equal(t, "containerd", k8sContainer["runtime"], "cache must still have container.runtime") +} + +// TestAnnotatorRunEventIndependence verifies that mutating the container field +// on the result of one Run() call does not affect the result of a subsequent call. +func TestAnnotatorRunEventIndependence(t *testing.T) { + meta := mapstr.M{ + "kubernetes": mapstr.M{ + "pod": mapstr.M{"name": "mypod"}, + "container": mapstr.M{ + "name": "mycontainer", + "image": "myimage:v3", + "id": "indep001", + "runtime": "containerd", + }, + }, + } + processor := newAnnotatorForTest(t, "indep001", meta) + + event1, err := processor.Run(baseEvent("indep001")) + require.NoError(t, err) + + event2, err := processor.Run(baseEvent("indep001")) + require.NoError(t, err) + + // Mutate event1's container field. + containerRaw1, err := event1.Fields.GetValue("container") + require.NoError(t, err) + container1, ok := containerRaw1.(mapstr.M) + require.True(t, ok) + container1["injected"] = "mutation" + + // event2's container field must be unaffected. + containerRaw2, err := event2.Fields.GetValue("container") + require.NoError(t, err) + container2, ok := containerRaw2.(mapstr.M) + require.True(t, ok) + + assert.NotContains(t, container2, "injected", "mutating first result must not affect second result") +} diff --git a/libbeat/processors/add_kubernetes_metadata/benchmark_test.go b/libbeat/processors/add_kubernetes_metadata/benchmark_test.go new file mode 100644 index 000000000000..d9ce37a98f4d --- /dev/null +++ b/libbeat/processors/add_kubernetes_metadata/benchmark_test.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build linux || darwin || windows + +package add_kubernetes_metadata + +import ( + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp/logptest" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +func BenchmarkKubernetesAnnotatorRun(b *testing.B) { + cfg := config.MustNewConfigFrom(map[string]interface{}{ + "lookup_fields": []string{"container.id"}, + }) + matcher, err := NewFieldMatcher(*cfg, logptest.NewTestingLogger(b, "")) + if err != nil { + b.Fatal(err) + } + + processor := &kubernetesAnnotator{ + log: logptest.NewTestingLogger(b, selector), + cache: newCache(10 * time.Second), + matchers: &Matchers{ + matchers: []Matcher{matcher}, + }, + kubernetesAvailable: true, + } + + const cacheKey = "abc123container" + + processor.cache.set(cacheKey, mapstr.M{ + "kubernetes": mapstr.M{ + "pod": mapstr.M{ + "name": "test-pod", + "uid": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", + "namespace": "default", + "labels": mapstr.M{ + "app": "myapp", + "version": "v1.2.3", + "env": "production", + }, + "annotations": mapstr.M{ + "deployment.kubernetes.io/revision": "3", + }, + }, + "node": mapstr.M{ + "name": "node-1", + }, + "namespace": "default", + "container": mapstr.M{ + "name": "mycontainer", + "image": "myrepo/myimage:latest", + "id": cacheKey, + "runtime": "containerd", + }, + }, + }) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + // Construct a minimal event with only the lookup field — no Clone() overhead + // counted against the benchmark. Run() will add kubernetes.* and container.* + // fields to this fresh event. + event := &beat.Event{ + Fields: mapstr.M{ + "container": mapstr.M{ + "id": cacheKey, + }, + "message": "some log line", + }, + } + _, err := processor.Run(event) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 85740dbc0afc..5fef49f2d184 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -351,22 +351,28 @@ func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) { return event, nil } - metaClone := metadata.Clone() - _ = metaClone.Delete("kubernetes.container.name") - containerImage, err := metadata.GetValue("kubernetes.container.image") - if err == nil { - _ = metaClone.Delete("kubernetes.container.image") - _, _ = metaClone.Put("kubernetes.container.image.name", containerImage) - } - cmeta, err := metaClone.Clone().GetValue("kubernetes.container") - if err == nil { - event.Fields.DeepUpdate(mapstr.M{ - "container": cmeta, - }) + // One full clone for the kubernetes field; one cheap sub-map clone for the OCI + // container field. This replaces the original three full clones. + kubeMeta := metadata.Clone() + + // Build the OCI container field by cloning only the container sub-map — + // much cheaper than cloning the full metadata. Transform it in place: + // drop container.name and rewrite container.image -> container.image.name. + if containerVal, err := kubeMeta.GetValue("kubernetes.container"); err == nil { + if cm, ok := containerVal.(mapstr.M); ok { + ociContainer := cm.Clone() + _ = ociContainer.Delete("name") + if img, imgErr := ociContainer.GetValue("image"); imgErr == nil { + _ = ociContainer.Delete("image") + ociContainer["image"] = mapstr.M{"name": img} + } + event.Fields.DeepUpdate(mapstr.M{"container": ociContainer}) + } } - kubeMeta := metadata.Clone() - // remove container meta from kubernetes.container.* + // Remove container fields that belong only in the OCI section before writing + // kubernetes metadata to the event. container.name is intentionally kept here + // to match original behaviour. _ = kubeMeta.Delete("kubernetes.container.id") _ = kubeMeta.Delete("kubernetes.container.runtime") _ = kubeMeta.Delete("kubernetes.container.image") diff --git a/libbeat/processors/timestamp/timestamp.go b/libbeat/processors/timestamp/timestamp.go index 59671f7f244d..9802a2218c49 100644 --- a/libbeat/processors/timestamp/timestamp.go +++ b/libbeat/processors/timestamp/timestamp.go @@ -127,32 +127,40 @@ func (p *processor) tryToTime(value interface{}) (time.Time, error) { } func (p *processor) parseValue(v interface{}) (time.Time, error) { - detailedErr := &parseError{} - + // Try each layout, returning on first success. The parseError and cause + // list are only allocated when all layouts fail, keeping the success path + // allocation-free. + var causes []error for _, layout := range p.Layouts { ts, err := p.parseValueByLayout(v, layout) if err == nil { return ts, nil } - var parseError *time.ParseError - if errors.As(err, &parseError) { - detailedErr.causes = append(detailedErr.causes, &parseErrorCause{parseError}) + var pe *time.ParseError + if errors.As(err, &pe) { + causes = append(causes, &parseErrorCause{pe}) } else { - detailedErr.causes = append(detailedErr.causes, err) + causes = append(causes, err) } } - detailedErr.field = p.Field - detailedErr.time = v + return p.parseFailure(v, &parseError{ + field: p.Field, + time: v, + causes: causes, + }) +} +// parseFailure logs the error (if debug is enabled) and returns it. +func (p *processor) parseFailure(v interface{}, err error) (time.Time, error) { if p.isDebug { if p.IgnoreFailure { - p.log.Debugw("(Ignored) Failure parsing time field.", "error", detailedErr) + p.log.Debugw("(Ignored) Failure parsing time field.", "error", err) } else { - p.log.Debugw("Failure parsing time field.", "error", detailedErr) + p.log.Debugw("Failure parsing time field.", "error", err) } } - return time.Time{}, detailedErr + return time.Time{}, err } func (p *processor) parseValueByLayout(v interface{}, layout string) (time.Time, error) { diff --git a/libbeat/processors/timestamp/timestamp_benchmark_test.go b/libbeat/processors/timestamp/timestamp_benchmark_test.go new file mode 100644 index 000000000000..3aa2fb53e7c3 --- /dev/null +++ b/libbeat/processors/timestamp/timestamp_benchmark_test.go @@ -0,0 +1,78 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package timestamp + +import ( + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp/logptest" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// BenchmarkTimestampSingleLayout measures the common case: one layout that +// matches every event. This is the hot path in most filebeat deployments. +func BenchmarkTimestampSingleLayout(b *testing.B) { + c := defaultConfig() + c.Field = "ts" + c.Layouts = []string{time.RFC3339Nano} + + p, err := newFromConfig(c, logptest.NewTestingLogger(b, "")) + if err != nil { + b.Fatal(err) + } + + tsStr := time.Date(2025, 3, 7, 11, 6, 39, 123456789, time.UTC).Format(time.RFC3339Nano) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + event := &beat.Event{Fields: mapstr.M{"ts": tsStr}} + _, err := p.Run(event) + if err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkTimestampMultipleLayouts measures the case where multiple layouts +// are configured and the matching layout is the last one tried. +func BenchmarkTimestampMultipleLayouts(b *testing.B) { + c := defaultConfig() + c.Field = "ts" + c.Layouts = []string{time.ANSIC, time.RFC822, time.RFC3339Nano} + + p, err := newFromConfig(c, logptest.NewTestingLogger(b, "")) + if err != nil { + b.Fatal(err) + } + + // Use RFC3339Nano format so the first two layouts fail. + tsStr := time.Date(2025, 3, 7, 11, 6, 39, 123456789, time.UTC).Format(time.RFC3339Nano) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + event := &beat.Event{Fields: mapstr.M{"ts": tsStr}} + _, err := p.Run(event) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go index 89110c4b063b..91044c4f2f15 100644 --- a/libbeat/publisher/processing/processors.go +++ b/libbeat/publisher/processing/processors.go @@ -180,7 +180,6 @@ func addMeta(event *beat.Event, meta mapstr.M) { if event.Meta == nil { event.Meta = meta } else { - event.Meta.Clone() event.Meta.DeepUpdate(meta) } } From edf0d3277b8ea56163d3f3d2fa431d4504c01104 Mon Sep 17 00:00:00 2001 From: strawgate Date: Mon, 30 Mar 2026 19:02:15 -0500 Subject: [PATCH 2/2] fix: use require.JSONEq for JSON comparison (testifylint) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- libbeat/beat/event_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/beat/event_test.go b/libbeat/beat/event_test.go index 24ffb87dfa3e..3e6daa91e139 100644 --- a/libbeat/beat/event_test.go +++ b/libbeat/beat/event_test.go @@ -129,7 +129,7 @@ func TestEvent(t *testing.T) { event := &Event{} require.NotPanics(t, func() { s := event.String() - require.Equal(t, `{"@metadata":{},"@timestamp":"0001-01-01T00:00:00Z"}`, s) + require.JSONEq(t, `{"@metadata":{},"@timestamp":"0001-01-01T00:00:00Z"}`, s) }) }) })