Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#7486)
- Add experimental observability metrics for simple span processor in `go.opentelemetry.io/otel/sdk/trace`. (#7374)
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#7512)
- Add experimental observability metrics for simple log processor in `go.opentelemetry.io/otel/sdk/log`. (#7548)

### Fixed

Expand Down
31 changes: 31 additions & 0 deletions sdk/log/internal/counter/counter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions sdk/log/internal/counter/counter_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions sdk/log/internal/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@
// Package internal provides internal functionality for the sdk/log package.
package internal // import "go.opentelemetry.io/otel/sdk/log/internal"

//go:generate gotmpl --body=../../../internal/shared/counter/counter.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/sdk/log/internal/counter\" }" --out=counter/counter.go
//go:generate gotmpl --body=../../../internal/shared/counter/counter_test.go.tmpl "--data={}" --out=counter/counter_test.go

//go:generate gotmpl --body=../../../internal/shared/x/x.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/sdk/log\" }" --out=x/x.go
//go:generate gotmpl --body=../../../internal/shared/x/x_test.go.tmpl "--data={}" --out=x/x_test.go
25 changes: 21 additions & 4 deletions sdk/log/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"sync"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/log/internal/counter"
"go.opentelemetry.io/otel/sdk/log/internal/observ"
)

// Compile-time check SimpleProcessor implements Processor.
Expand All @@ -17,8 +21,8 @@ var _ Processor = (*SimpleProcessor)(nil)
type SimpleProcessor struct {
mu sync.Mutex
exporter Exporter

noCmp [0]func() //nolint: unused // This is indeed used.
inst *observ.SLP
noCmp [0]func() //nolint: unused // This is indeed used.
}

// NewSimpleProcessor is a simple Processor adapter.
Expand All @@ -30,7 +34,15 @@ type SimpleProcessor struct {
// [NewBatchProcessor] instead. However, there may be exceptions where certain
// [Exporter] implementations perform better with this Processor.
func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimpleProcessor {
return &SimpleProcessor{exporter: exporter}
slp := &SimpleProcessor{
exporter: exporter,
}
var err error
slp.inst, err = observ.NewSLP(counter.NextExporterID())
if err != nil {
otel.Handle(err)
}
return slp
}

var simpleProcRecordsPool = sync.Pool{
Expand All @@ -41,7 +53,7 @@ var simpleProcRecordsPool = sync.Pool{
}

// OnEmit batches provided log record.
func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) (err error) {
if s.exporter == nil {
return nil
}
Expand All @@ -55,6 +67,11 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
simpleProcRecordsPool.Put(records)
}()

if s.inst != nil {
defer func() {
s.inst.LogProcessed(ctx, err)
}()
}
return s.exporter.Export(ctx, *records)
}

Expand Down
192 changes: 192 additions & 0 deletions sdk/log/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,26 @@ package log_test
import (
"context"
"io"
"strconv"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/log/internal/counter"
"go.opentelemetry.io/otel/sdk/log/internal/observ"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)

type exporter struct {
Expand All @@ -40,6 +52,17 @@ func (e *exporter) ForceFlush(context.Context) error {
return nil
}

var _ log.Exporter = (*failingTestExporter)(nil)

type failingTestExporter struct {
exporter
}

func (f *failingTestExporter) Export(ctx context.Context, r []log.Record) error {
_ = f.exporter.Export(ctx, r)
return assert.AnError
}

func TestSimpleProcessorOnEmit(t *testing.T) {
e := new(exporter)
s := log.NewSimpleProcessor(e)
Expand Down Expand Up @@ -138,3 +161,172 @@ func BenchmarkSimpleProcessorOnEmit(b *testing.B) {
_ = out
})
}

func BenchmarkSimpleProcessorObservability(b *testing.B) {
run := func(b *testing.B) {
slp := log.NewSimpleProcessor(&failingTestExporter{exporter: exporter{}})
record := new(log.Record)
record.SetSeverityText("test")

ctx := b.Context()
b.ReportAllocs()
b.ResetTimer()

var err error
for b.Loop() {
err = slp.OnEmit(ctx, record)
}
_ = err
}

b.Run("Observability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
run(b)
})
b.Run("NoObservability", run)
}

func TestSimpleLogProcessorObservability(t *testing.T) {
testcases := []struct {
name string
enabled bool
exporter log.Exporter
wantErr error
assertMetrics func(t *testing.T, rm metricdata.ResourceMetrics)
}{
{
name: "disabled",
enabled: false,
exporter: new(exporter),
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Empty(t, rm.ScopeMetrics)
},
},
{
name: "enabled",
enabled: true,
exporter: new(exporter),
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]

p := otelconv.SDKProcessorLogProcessed{}

want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: p.Name(),
Description: p.Description(),
Unit: p.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
observ.GetSLPComponentName(0),
semconv.OTelComponentTypeKey.String(
string(otelconv.ComponentTypeSimpleLogProcessor),
),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}

metricdatatest.AssertEqual(
t,
want,
sm,
metricdatatest.IgnoreExemplars(),
metricdatatest.IgnoreTimestamp(),
)
},
},
{
name: "Enable Exporter error",
enabled: true,
wantErr: assert.AnError,
exporter: &failingTestExporter{
exporter: exporter{},
},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]
p := otelconv.SDKProcessorLogProcessed{}

want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: "go.opentelemetry.io/otel/sdk/log/internal/observ",
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: p.Name(),
Description: p.Description(),
Unit: p.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
observ.GetSLPComponentName(0),
semconv.OTelComponentTypeKey.String(
string(otelconv.ComponentTypeSimpleLogProcessor),
),
semconv.ErrorTypeKey.String("*errors.errorString"),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}

metricdatatest.AssertEqual(
t,
want,
sm,
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
)
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", strconv.FormatBool(tc.enabled))

original := otel.GetMeterProvider()
t.Cleanup(func() {
otel.SetMeterProvider(original)
})

r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)

slp := log.NewSimpleProcessor(tc.exporter)
record := new(log.Record)
record.SetSeverityText("test")
err := slp.OnEmit(t.Context(), record)
require.ErrorIs(t, err, tc.wantErr)
var rm metricdata.ResourceMetrics
require.NoError(t, r.Collect(t.Context(), &rm))
tc.assertMetrics(t, rm)
counter.SetExporterID(0)
})
}
}
Loading