diff --git a/CHANGELOG.md b/CHANGELOG.md index f3abcfdc2e3..7c7fa0cf381 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,6 +66,8 @@ The next release will require at least [Go 1.24]. - The `go.opentelemetry.io/otel/semconv/v1.37.0` package. The package contains semantic conventions from the `v1.37.0` version of the OpenTelemetry Semantic Conventions. See the [migration documentation](./semconv/v1.37.0/MIGRATION.md) for information on how to upgrade from `go.opentelemetry.io/otel/semconv/v1.36.0.`(#7254) +- Add experimental self-observability metrics for the simple log processor in `go.opentelemetry.io/otel/sdk/log`. + Adds `otel.sdk.processor.log.processed` metric when `OTEL_GO_X_SELF_OBSERVABILITY` environment variable is set to `true`. (#7127) ### Changed diff --git a/sdk/log/internal/x/README.md b/sdk/log/internal/x/README.md index 83e9e7b4cef..e49bc2cab8a 100644 --- a/sdk/log/internal/x/README.md +++ b/sdk/log/internal/x/README.md @@ -19,6 +19,7 @@ To opt-in, set the environment variable `OTEL_GO_X_SELF_OBSERVABILITY` to `true` When enabled, the SDK will create the following metrics using the global `MeterProvider`: - `otel.sdk.log.created` +- `otel.sdk.processor.log.processed` Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentation for more details on these metrics. diff --git a/sdk/log/simple.go b/sdk/log/simple.go index 002e52cae66..c6a984781ea 100644 --- a/sdk/log/simple.go +++ b/sdk/log/simple.go @@ -5,12 +5,25 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" + "fmt" "sync" + "sync/atomic" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/log/internal/x" + semconv "go.opentelemetry.io/otel/semconv/v1.36.0" + "go.opentelemetry.io/otel/semconv/v1.36.0/otelconv" ) // Compile-time check SimpleProcessor implements Processor. var _ Processor = (*SimpleProcessor)(nil) +// simpleProcessorIDCounter is used to generate unique component names. +var simpleProcessorIDCounter atomic.Uint64 + // SimpleProcessor is an processor that synchronously exports log records. // // Use [NewSimpleProcessor] to create a SimpleProcessor. @@ -18,6 +31,10 @@ type SimpleProcessor struct { mu sync.Mutex exporter Exporter + selfObservabilityEnabled bool + processedMetric otelconv.SDKProcessorLogProcessed + componentName string + noCmp [0]func() //nolint: unused // This is indeed used. } @@ -30,7 +47,33 @@ type SimpleProcessor struct { // [NewBatchProcessor] instead. However, there may be exceptions where certain // [Exporter] implementations perform better with this Processor. func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimpleProcessor { - return &SimpleProcessor{exporter: exporter} + s := &SimpleProcessor{ + exporter: exporter, + componentName: fmt.Sprintf( + "%s/%d", + string(otelconv.ComponentTypeSimpleLogProcessor), + simpleProcessorIDCounter.Add(1)-1, + ), + } + s.initSelfObservability() + return s +} + +func (s *SimpleProcessor) initSelfObservability() { + if !x.SelfObservability.Enabled() { + return + } + + s.selfObservabilityEnabled = true + mp := otel.GetMeterProvider() + m := mp.Meter("go.opentelemetry.io/otel/sdk/log", + metric.WithInstrumentationVersion(sdk.Version()), + metric.WithSchemaURL(semconv.SchemaURL)) + + var err error + if s.processedMetric, err = otelconv.NewSDKProcessorLogProcessed(m); err != nil { + otel.Handle(err) + } } var simpleProcRecordsPool = sync.Pool{ @@ -41,11 +84,7 @@ var simpleProcRecordsPool = sync.Pool{ } // OnEmit batches provided log record. -func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error { - if s.exporter == nil { - return nil - } - +func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) (err error) { s.mu.Lock() defer s.mu.Unlock() @@ -55,6 +94,22 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error { simpleProcRecordsPool.Put(records) }() + if s.selfObservabilityEnabled { + defer func() { + attrs := make([]attribute.KeyValue, 2, 3) + attrs[0] = s.processedMetric.AttrComponentType(otelconv.ComponentTypeSimpleLogProcessor) + attrs[1] = s.processedMetric.AttrComponentName(s.componentName) + if err != nil { + attrs = append(attrs, s.processedMetric.AttrErrorType(otelconv.ErrorTypeOther)) + } + s.processedMetric.Add(context.Background(), int64(len(*records)), attrs...) + }() + } + + if s.exporter == nil { + return nil + } + return s.exporter.Export(ctx, *records) } diff --git a/sdk/log/simple_test.go b/sdk/log/simple_test.go index 394e4b7968f..8ae9c9c3d7f 100644 --- a/sdk/log/simple_test.go +++ b/sdk/log/simple_test.go @@ -5,17 +5,70 @@ package log_test import ( "context" + "errors" "io" "strings" "sync" "testing" + "go.opentelemetry.io/otel/attribute" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/log" + "go.opentelemetry.io/otel/sdk/log/internal/x" + metricSDK "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + semconv "go.opentelemetry.io/otel/semconv/v1.36.0" + "go.opentelemetry.io/otel/semconv/v1.36.0/otelconv" ) +// extractAttributeValue extracts the value of a specific attribute from the first DataPoint found. +func extractAttributeValue(data any, attrKey attribute.Key) (attribute.Value, bool) { + switch d := data.(type) { + case metricdata.ResourceMetrics: + for _, scope := range d.ScopeMetrics { + for _, m := range scope.Metrics { + if val, ok := extractAttributeValue(m.Data, attrKey); ok { + return val, true + } + } + } + case metricdata.Sum[int64]: + for _, dp := range d.DataPoints { + if val, ok := dp.Attributes.Value(attrKey); ok { + return val, true + } + } + case metricdata.Sum[float64]: + for _, dp := range d.DataPoints { + if val, ok := dp.Attributes.Value(attrKey); ok { + return val, true + } + } + case metricdata.Gauge[int64]: + for _, dp := range d.DataPoints { + if val, ok := dp.Attributes.Value(attrKey); ok { + return val, true + } + } + case metricdata.Gauge[float64]: + for _, dp := range d.DataPoints { + if val, ok := dp.Attributes.Value(attrKey); ok { + return val, true + } + } + } + return attribute.Value{}, false +} + type exporter struct { records []log.Record @@ -120,6 +173,247 @@ func TestSimpleProcessorConcurrentSafe(*testing.T) { wg.Wait() } +type errorExporter struct { + err error +} + +func (e *errorExporter) Export(_ context.Context, _ []log.Record) error { + return e.err +} + +func (*errorExporter) Shutdown(context.Context) error { + return nil +} + +func (*errorExporter) ForceFlush(context.Context) error { + return nil +} + +type failingMeterProvider struct { + noop.MeterProvider +} + +func (*failingMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter { + return &failingMeter{Meter: noop.NewMeterProvider().Meter(name, opts...)} +} + +type failingMeter struct { + metric.Meter +} + +func (*failingMeter) Int64Counter(_ string, _ ...metric.Int64CounterOption) (metric.Int64Counter, error) { + return nil, errors.New("failed to create counter") +} + +func TestSimpleProcessorSelfObservability(t *testing.T) { + originalMP := otel.GetMeterProvider() + setupCleanMeterProvider := func(t *testing.T) { + t.Cleanup(func() { + otel.SetMeterProvider(originalMP) + }) + } + + t.Run("self observability disabled", func(t *testing.T) { + setupCleanMeterProvider(t) + + reader := metricSDK.NewManualReader() + mp := metricSDK.NewMeterProvider(metricSDK.WithReader(reader)) + otel.SetMeterProvider(mp) + + e := new(exporter) + s := log.NewSimpleProcessor(e) + + r := new(log.Record) + r.SetSeverityText("test") + _ = s.OnEmit(context.Background(), r) + + require.True(t, e.exportCalled) + assert.Equal(t, []log.Record{*r}, e.records) + + rm := metricdata.ResourceMetrics{} + err := reader.Collect(context.Background(), &rm) + require.NoError(t, err) + + expected := metricdata.ResourceMetrics{ + Resource: rm.Resource, + ScopeMetrics: []metricdata.ScopeMetrics{}, + } + + metricdatatest.AssertEqual(t, expected, rm, metricdatatest.IgnoreTimestamp()) + }) + + t.Run("self observability enabled without error", func(t *testing.T) { + setupCleanMeterProvider(t) + + t.Setenv(x.SelfObservability.Key(), "true") + + reader := metricSDK.NewManualReader() + mp := metricSDK.NewMeterProvider(metricSDK.WithReader(reader)) + otel.SetMeterProvider(mp) + + e := new(exporter) + s := log.NewSimpleProcessor(e) + + r := new(log.Record) + r.SetSeverityText("test") + + var err error + err = s.OnEmit(context.Background(), r) + require.NoError(t, err) + + err = s.OnEmit(context.Background(), r) + require.NoError(t, err) + + err = s.OnEmit(context.Background(), r) + require.NoError(t, err) + + // First collect to get the actual component name + rm := metricdata.ResourceMetrics{} + err = reader.Collect(context.Background(), &rm) + require.NoError(t, err) + require.Len(t, rm.ScopeMetrics, 1) + + // Extract the actual component name and use it in expected structure + componentVal, ok := extractAttributeValue(rm, "otel.component.name") + require.True(t, ok, "component name attribute should be present") + actualComponentName := componentVal.AsString() + + expected := metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/sdk/log", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: "otel.sdk.processor.log.processed", + Description: "The number of log records for which the processing has finished, either successful or failed", + Unit: "{log_record}", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 3, + Attributes: attribute.NewSet( + attribute.String( + "otel.component.type", + string(otelconv.ComponentTypeSimpleLogProcessor), + ), + attribute.String("otel.component.name", actualComponentName), + ), + Exemplars: []metricdata.Exemplar[int64]{}, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + }, + }, + }, + } + + require.Len(t, rm.ScopeMetrics, 1) + metricdatatest.AssertEqual(t, expected.ScopeMetrics[0], rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + }) + + t.Run("self observability enabled with error", func(t *testing.T) { + setupCleanMeterProvider(t) + + t.Setenv(x.SelfObservability.Key(), "true") + + reader := metricSDK.NewManualReader() + mp := metricSDK.NewMeterProvider(metricSDK.WithReader(reader)) + otel.SetMeterProvider(mp) + + e := &errorExporter{err: errors.New("export failed")} + s := log.NewSimpleProcessor(e) + + r := new(log.Record) + r.SetSeverityText("test") + + var err error + err = s.OnEmit(context.Background(), r) + require.Error(t, err) + assert.Equal(t, "export failed", err.Error()) + + err = s.OnEmit(context.Background(), r) + require.Error(t, err) + assert.Equal(t, "export failed", err.Error()) + + rm := metricdata.ResourceMetrics{} + collectErr := reader.Collect(context.Background(), &rm) + require.NoError(t, collectErr) + require.Len(t, rm.ScopeMetrics, 1) + + componentVal, ok := extractAttributeValue(rm, "otel.component.name") + require.True(t, ok) + actualComponentName := componentVal.AsString() + + expected := metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/sdk/log", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: "otel.sdk.processor.log.processed", + Description: "The number of log records for which the processing has finished, either successful or failed", + Unit: "{log_record}", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 2, + Attributes: attribute.NewSet( + attribute.String( + "otel.component.type", + string(otelconv.ComponentTypeSimpleLogProcessor), + ), + attribute.String("otel.component.name", actualComponentName), + attribute.String("error.type", string(otelconv.ErrorTypeOther)), + ), + Exemplars: []metricdata.Exemplar[int64]{}, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + }, + }, + }, + } + + require.Len(t, rm.ScopeMetrics, 1) + metricdatatest.AssertEqual(t, expected.ScopeMetrics[0], rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + }) + + t.Run("self observability metric creation error handled", func(t *testing.T) { + setupCleanMeterProvider(t) + + t.Setenv(x.SelfObservability.Key(), "true") + + failingMP := &failingMeterProvider{} + otel.SetMeterProvider(failingMP) + + assert.NotPanics(t, func() { + e := new(exporter) + s := log.NewSimpleProcessor(e) + + r := new(log.Record) + r.SetSeverityText("test") + _ = s.OnEmit(context.Background(), r) + + require.True(t, e.exportCalled) + assert.Equal(t, []log.Record{*r}, e.records) + }) + }) +} + func BenchmarkSimpleProcessorOnEmit(b *testing.B) { r := new(log.Record) r.SetSeverityText("test")