diff --git a/CHANGELOG.md b/CHANGELOG.md index 3eff87a349d..4e69c1f0123 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,7 @@ The next release will require at least [Go 1.24]. ### Fixed - Fix `go.opentelemetry.io/otel/exporters/prometheus` to deduplicate suffixes if already present in metric name when UTF8 is enabled. (#7088) +- `SetBody` method of `Record` in `go.opentelemetry.io/otel/sdk/log` now deduplicates key-value collections (`log.Value` of `log.KindMap` from `go.opentelemetry.io/otel/log`). (#7002) - Fix the `go.opentelemetry.io/otel/exporters/stdout/stdouttrace` self-observability component type and name. (#7195) - Fix partial export count metric in `go.opentelemetry.io/otel/exporters/stdout/stdouttrace`. (#7199) diff --git a/sdk/log/logger.go b/sdk/log/logger.go index c1052f5c825..55423748621 100644 --- a/sdk/log/logger.go +++ b/sdk/log/logger.go @@ -108,7 +108,6 @@ func (l *logger) newRecord(ctx context.Context, r log.Record) Record { observedTimestamp: r.ObservedTimestamp(), severity: r.Severity(), severityText: r.SeverityText(), - body: r.Body(), traceID: sc.TraceID(), spanID: sc.SpanID(), @@ -124,6 +123,9 @@ func (l *logger) newRecord(ctx context.Context, r log.Record) Record { l.logCreatedMetric.Add(ctx, 1) } + // This ensures we deduplicate key-value collections in the log body + newRecord.SetBody(r.Body()) + // This field SHOULD be set once the event is observed by OpenTelemetry. if newRecord.observedTimestamp.IsZero() { newRecord.observedTimestamp = now() diff --git a/sdk/log/logger_test.go b/sdk/log/logger_test.go index a9c6eb87bc5..b326467a4e8 100644 --- a/sdk/log/logger_test.go +++ b/sdk/log/logger_test.go @@ -57,10 +57,20 @@ func TestLoggerEmit(t *testing.T) { rWithNoObservedTimestamp := r rWithNoObservedTimestamp.SetObservedTimestamp(time.Time{}) - rWithoutDeduplicateAttributes := r - rWithoutDeduplicateAttributes.AddAttributes( + rWithAllowKeyDuplication := r + rWithAllowKeyDuplication.AddAttributes( log.String("k1", "str1"), ) + rWithAllowKeyDuplication.SetBody(log.MapValue( + log.Int64("1", 2), + log.Int64("1", 3), + )) + + rWithDuplicatesInBody := r + rWithDuplicatesInBody.SetBody(log.MapValue( + log.Int64("1", 2), + log.Int64("1", 3), + )) contextWithSpanContext := trace.ContextWithSpanContext( context.Background(), @@ -222,7 +232,7 @@ func TestLoggerEmit(t *testing.T) { }, }, { - name: "WithoutAttributeDeduplication", + name: "WithAllowKeyDuplication", logger: newLogger(NewLoggerProvider( WithProcessor(p0), WithProcessor(p1), @@ -232,15 +242,15 @@ func TestLoggerEmit(t *testing.T) { WithAllowKeyDuplication(), ), instrumentation.Scope{Name: "scope"}), ctx: context.Background(), - record: rWithoutDeduplicateAttributes, + record: rWithAllowKeyDuplication, expectedRecords: []Record{ { - eventName: r.EventName(), - timestamp: r.Timestamp(), - body: r.Body(), - severity: r.Severity(), - severityText: r.SeverityText(), - observedTimestamp: r.ObservedTimestamp(), + eventName: rWithAllowKeyDuplication.EventName(), + timestamp: rWithAllowKeyDuplication.Timestamp(), + body: rWithAllowKeyDuplication.Body(), + severity: rWithAllowKeyDuplication.Severity(), + severityText: rWithAllowKeyDuplication.SeverityText(), + observedTimestamp: rWithAllowKeyDuplication.ObservedTimestamp(), resource: resource.NewSchemaless(attribute.String("key", "value")), attributeValueLengthLimit: 5, attributeCountLimit: 5, @@ -255,6 +265,39 @@ func TestLoggerEmit(t *testing.T) { }, }, }, + { + name: "WithDuplicatesInBody", + logger: newLogger(NewLoggerProvider( + WithProcessor(p0), + WithProcessor(p1), + WithAttributeValueLengthLimit(5), + WithAttributeCountLimit(5), + WithResource(resource.NewSchemaless(attribute.String("key", "value"))), + ), instrumentation.Scope{Name: "scope"}), + ctx: context.Background(), + record: rWithDuplicatesInBody, + expectedRecords: []Record{ + { + eventName: rWithDuplicatesInBody.EventName(), + timestamp: rWithDuplicatesInBody.Timestamp(), + body: log.MapValue( + log.Int64("1", 3), + ), + severity: rWithDuplicatesInBody.Severity(), + severityText: rWithDuplicatesInBody.SeverityText(), + observedTimestamp: rWithDuplicatesInBody.ObservedTimestamp(), + resource: resource.NewSchemaless(attribute.String("key", "value")), + attributeValueLengthLimit: 5, + attributeCountLimit: 5, + scope: &instrumentation.Scope{Name: "scope"}, + front: [attributesInlineCount]log.KeyValue{ + log.String("k1", "str"), + log.Float64("k2", 1.0), + }, + nFront: 2, + }, + }, + }, } for _, tc := range testCases { diff --git a/sdk/log/record.go b/sdk/log/record.go index 3f6fec5f375..9dfd69b645b 100644 --- a/sdk/log/record.go +++ b/sdk/log/record.go @@ -170,7 +170,11 @@ func (r *Record) Body() log.Value { // SetBody sets the body of the log record. func (r *Record) SetBody(v log.Value) { - r.body = v + if !r.allowDupKeys { + r.body = r.dedupeBodyCollections(v) + } else { + r.body = v + } } // WalkAttributes walks all attributes the log record holds by calling f for @@ -452,6 +456,24 @@ func (r *Record) applyValueLimits(val log.Value) log.Value { return val } +func (r *Record) dedupeBodyCollections(val log.Value) log.Value { + switch val.Kind() { + case log.KindSlice: + sl := val.AsSlice() + for i := range sl { + sl[i] = r.dedupeBodyCollections(sl[i]) + } + val = log.SliceValue(sl...) + case log.KindMap: + kvs, _ := dedup(val.AsMap()) + for i := range kvs { + kvs[i].Value = r.dedupeBodyCollections(kvs[i].Value) + } + val = log.MapValue(kvs...) + } + return val +} + // truncate returns a truncated version of s such that it contains less than // the limit number of characters. Truncation is applied by returning the limit // number of valid characters contained in s. diff --git a/sdk/log/record_test.go b/sdk/log/record_test.go index 8132dc59585..103e673a15f 100644 --- a/sdk/log/record_test.go +++ b/sdk/log/record_test.go @@ -56,10 +56,82 @@ func TestRecordSeverityText(t *testing.T) { } func TestRecordBody(t *testing.T) { - v := log.BoolValue(true) - r := new(Record) - r.SetBody(v) - assert.True(t, v.Equal(r.Body())) + testcases := []struct { + name string + allowDuplicates bool + body log.Value + want log.Value + }{ + { + name: "Bool", + body: log.BoolValue(true), + want: log.BoolValue(true), + }, + { + name: "slice", + body: log.SliceValue(log.BoolValue(true), log.BoolValue(false)), + want: log.SliceValue(log.BoolValue(true), log.BoolValue(false)), + }, + { + name: "map", + body: log.MapValue( + log.Bool("0", true), + log.Int64("1", 2), // This should be removed + log.Float64("2", 3.0), + log.String("3", "forth"), + log.Slice("4", log.Int64Value(1)), + log.Map("5", log.Int("key", 2)), + log.Bytes("6", []byte("six")), + log.Int64("1", 3), + ), + want: log.MapValue( + log.Bool("0", true), + log.Float64("2", 3.0), + log.String("3", "forth"), + log.Slice("4", log.Int64Value(1)), + log.Map("5", log.Int("key", 2)), + log.Bytes("6", []byte("six")), + log.Int64("1", 3), + ), + }, + { + name: "nestedMap", + body: log.MapValue( + log.Map("key", + log.Int64("key", 1), + log.Int64("key", 2), + ), + ), + want: log.MapValue( + log.Map("key", + log.Int64("key", 2), + ), + ), + }, + { + name: "map - allow duplicates", + allowDuplicates: true, + body: log.MapValue( + log.Int64("1", 2), + log.Int64("1", 3), + ), + want: log.MapValue( + log.Int64("1", 2), + log.Int64("1", 3), + ), + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + r := new(Record) + r.allowDupKeys = tc.allowDuplicates + r.SetBody(tc.body) + got := r.Body() + if !got.Equal(tc.want) { + t.Errorf("r.Body() = %v, want %v", got, tc.want) + } + }) + } } func TestRecordAttributes(t *testing.T) { @@ -951,3 +1023,23 @@ func BenchmarkSetAddAttributes(b *testing.B) { } }) } + +func BenchmarkSetBody(b *testing.B) { + b.Run("SetBody", func(b *testing.B) { + records := make([]Record, b.N) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + records[i].SetBody(log.MapValue( + log.Bool("0", true), + log.Float64("2", 3.0), + log.String("3", "forth"), + log.Slice("4", log.Int64Value(1)), + log.Map("5", log.Int("key", 2)), + log.Bytes("6", []byte("six")), + log.Int64("1", 3), + )) + } + }) +}