diff --git a/CHANGELOG.md b/CHANGELOG.md index bc6c571bb4a..929340dcd7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add experimental observability metrics for manual reader in `go.opentelemetry.io/otel/sdk/metric`. (#7524) - Add experimental observability metrics for periodic reader in `go.opentelemetry.io/otel/sdk/metric`. (#7571) - Support `OTEL_EXPORTER_OTLP_LOGS_INSECURE` and `OTEL_EXPORTER_OTLP_INSECURE` environmental variables in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#7608) +- Add experimental observability metrics for simple log processor in `go.opentelemetry.io/otel/sdk/log`. (#7548) ### Fixed diff --git a/sdk/log/internal/observ/simple_log_processor.go b/sdk/log/internal/observ/simple_log_processor.go index f69bc5f1d38..932eec07e94 100644 --- a/sdk/log/internal/observ/simple_log_processor.go +++ b/sdk/log/internal/observ/simple_log_processor.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -33,6 +34,24 @@ var measureAttrsPool = sync.Pool{ }, } +// simpleProcessorN is a global 0-based count of the number of simple processor created. +var simpleProcessorN atomic.Int64 + +// NextSimpleProcessorID returns the next unique ID for a simpleProcessor. +func NextSimpleProcessorID() int64 { + const inc = 1 + return simpleProcessorN.Add(inc) - inc +} + +// SetSimpleProcessorID sets the exporter ID counter to v and returns the previous +// value. +// +// This function is useful for testing purposes, allowing you to reset the +// counter. It should not be used in production code. +func SetSimpleProcessorID(v int64) int64 { + return simpleProcessorN.Swap(v) +} + // GetSLPComponentName returns the component name attribute for a // SimpleLogProcessor with the given ID. func GetSLPComponentName(id int64) attribute.KeyValue { diff --git a/sdk/log/internal/observ/simple_log_processor_test.go b/sdk/log/internal/observ/simple_log_processor_test.go index a186d656c4d..edf4214177b 100644 --- a/sdk/log/internal/observ/simple_log_processor_test.go +++ b/sdk/log/internal/observ/simple_log_processor_test.go @@ -5,6 +5,7 @@ package observ import ( "errors" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -23,6 +24,52 @@ import ( "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" ) +func TestNextExporterID(t *testing.T) { + SetSimpleProcessorID(0) + + var expected int64 + for range 10 { + id := NextSimpleProcessorID() + assert.Equal(t, expected, id) + expected++ + } +} + +func TestSetExporterID(t *testing.T) { + SetSimpleProcessorID(0) + + prev := SetSimpleProcessorID(42) + assert.Equal(t, int64(0), prev) + + id := NextSimpleProcessorID() + assert.Equal(t, int64(42), id) +} + +func TestNextExporterIDConcurrentSafe(t *testing.T) { + SetSimpleProcessorID(0) + + const goroutines = 100 + const increments = 10 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for range goroutines { + go func() { + defer wg.Done() + for range increments { + NextSimpleProcessorID() + } + }() + } + + wg.Wait() + + expected := int64(goroutines * increments) + id := NextSimpleProcessorID() + assert.Equal(t, expected, id) +} + type errMeterProvider struct { mapi.MeterProvider err error diff --git a/sdk/log/simple.go b/sdk/log/simple.go index 002e52cae66..e24259057e7 100644 --- a/sdk/log/simple.go +++ b/sdk/log/simple.go @@ -6,6 +6,9 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" "sync" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/log/internal/observ" ) // Compile-time check SimpleProcessor implements Processor. @@ -17,8 +20,8 @@ var _ Processor = (*SimpleProcessor)(nil) type SimpleProcessor struct { mu sync.Mutex exporter Exporter - - noCmp [0]func() //nolint: unused // This is indeed used. + inst *observ.SLP + noCmp [0]func() //nolint: unused // This is indeed used. } // NewSimpleProcessor is a simple Processor adapter. @@ -30,7 +33,15 @@ type SimpleProcessor struct { // [NewBatchProcessor] instead. However, there may be exceptions where certain // [Exporter] implementations perform better with this Processor. func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimpleProcessor { - return &SimpleProcessor{exporter: exporter} + slp := &SimpleProcessor{ + exporter: exporter, + } + var err error + slp.inst, err = observ.NewSLP(observ.NextSimpleProcessorID()) + if err != nil { + otel.Handle(err) + } + return slp } var simpleProcRecordsPool = sync.Pool{ @@ -41,7 +52,7 @@ var simpleProcRecordsPool = sync.Pool{ } // OnEmit batches provided log record. -func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error { +func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) (err error) { if s.exporter == nil { return nil } @@ -55,6 +66,11 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error { simpleProcRecordsPool.Put(records) }() + if s.inst != nil { + defer func() { + s.inst.LogProcessed(ctx, err) + }() + } return s.exporter.Export(ctx, *records) } diff --git a/sdk/log/simple_test.go b/sdk/log/simple_test.go index 0c59c68a284..6704513bc3e 100644 --- a/sdk/log/simple_test.go +++ b/sdk/log/simple_test.go @@ -6,6 +6,7 @@ package log_test import ( "context" "io" + "strconv" "strings" "sync" "testing" @@ -13,7 +14,17 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/log" + "go.opentelemetry.io/otel/sdk/log/internal/observ" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" ) type exporter struct { @@ -40,6 +51,17 @@ func (e *exporter) ForceFlush(context.Context) error { return nil } +var _ log.Exporter = (*failingTestExporter)(nil) + +type failingTestExporter struct { + exporter +} + +func (f *failingTestExporter) Export(ctx context.Context, r []log.Record) error { + _ = f.exporter.Export(ctx, r) + return assert.AnError +} + func TestSimpleProcessorOnEmit(t *testing.T) { e := new(exporter) s := log.NewSimpleProcessor(e) @@ -138,3 +160,172 @@ func BenchmarkSimpleProcessorOnEmit(b *testing.B) { _ = out }) } + +func BenchmarkSimpleProcessorObservability(b *testing.B) { + run := func(b *testing.B) { + slp := log.NewSimpleProcessor(&failingTestExporter{exporter: exporter{}}) + record := new(log.Record) + record.SetSeverityText("test") + + ctx := b.Context() + b.ReportAllocs() + b.ResetTimer() + + var err error + for b.Loop() { + err = slp.OnEmit(ctx, record) + } + _ = err + } + + b.Run("Observability", func(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + run(b) + }) + b.Run("NoObservability", run) +} + +func TestSimpleLogProcessorObservability(t *testing.T) { + testcases := []struct { + name string + enabled bool + exporter log.Exporter + wantErr error + assertMetrics func(t *testing.T, rm metricdata.ResourceMetrics) + }{ + { + name: "disabled", + enabled: false, + exporter: new(exporter), + assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) { + assert.Empty(t, rm.ScopeMetrics) + }, + }, + { + name: "enabled", + enabled: true, + exporter: new(exporter), + assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) { + assert.Len(t, rm.ScopeMetrics, 1) + sm := rm.ScopeMetrics[0] + + p := otelconv.SDKProcessorLogProcessed{} + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: observ.ScopeName, + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: p.Name(), + Description: p.Description(), + Unit: p.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + observ.GetSLPComponentName(0), + semconv.OTelComponentTypeKey.String( + string(otelconv.ComponentTypeSimpleLogProcessor), + ), + ), + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + }, + } + + metricdatatest.AssertEqual( + t, + want, + sm, + metricdatatest.IgnoreExemplars(), + metricdatatest.IgnoreTimestamp(), + ) + }, + }, + { + name: "Enable Exporter error", + enabled: true, + wantErr: assert.AnError, + exporter: &failingTestExporter{ + exporter: exporter{}, + }, + assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) { + assert.Len(t, rm.ScopeMetrics, 1) + sm := rm.ScopeMetrics[0] + p := otelconv.SDKProcessorLogProcessed{} + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/sdk/log/internal/observ", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: p.Name(), + Description: p.Description(), + Unit: p.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + observ.GetSLPComponentName(0), + semconv.OTelComponentTypeKey.String( + string(otelconv.ComponentTypeSimpleLogProcessor), + ), + semconv.ErrorTypeKey.String("*errors.errorString"), + ), + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + }, + } + + metricdatatest.AssertEqual( + t, + want, + sm, + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreExemplars(), + ) + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + t.Setenv("OTEL_GO_X_OBSERVABILITY", strconv.FormatBool(tc.enabled)) + + original := otel.GetMeterProvider() + t.Cleanup(func() { + otel.SetMeterProvider(original) + }) + + r := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(r)) + otel.SetMeterProvider(mp) + + slp := log.NewSimpleProcessor(tc.exporter) + record := new(log.Record) + record.SetSeverityText("test") + err := slp.OnEmit(t.Context(), record) + require.ErrorIs(t, err, tc.wantErr) + var rm metricdata.ResourceMetrics + require.NoError(t, r.Collect(t.Context(), &rm)) + tc.assertMetrics(t, rm) + observ.SetSimpleProcessorID(0) + }) + } +}