Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#7512)
- Add experimental observability metrics for manual reader in `go.opentelemetry.io/otel/sdk/metric`. (#7524)
- Add experimental observability metrics for periodic reader in `go.opentelemetry.io/otel/sdk/metric`. (#7571)
- Add experimental observability metrics for simple log processor in `go.opentelemetry.io/otel/sdk/log`. (#7548)

### Fixed

Expand Down
19 changes: 19 additions & 0 deletions sdk/log/internal/observ/simple_log_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -33,6 +34,24 @@ var measureAttrsPool = sync.Pool{
},
}

// simpleProcessorN is a global 0-based count of the number of simple processor created.
var simpleProcessorN atomic.Int64

// NextSimpleProcessorID returns the next unique ID for an simpleProcessor.
func NextSimpleProcessorID() int64 {
const inc = 1
return simpleProcessorN.Add(inc) - inc
}

// SetSimpleProcessorID sets the exporter ID counter to v and returns the previous
// value.
//
// This function is useful for testing purposes, allowing you to reset the
// counter. It should not be used in production code.
func SetSimpleProcessorID(v int64) int64 {
return simpleProcessorN.Swap(v)
}

// GetSLPComponentName returns the component name attribute for a
// SimpleLogProcessor with the given ID.
func GetSLPComponentName(id int64) attribute.KeyValue {
Expand Down
47 changes: 47 additions & 0 deletions sdk/log/internal/observ/simple_log_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package observ

import (
"errors"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -23,6 +24,52 @@ import (
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)

func TestNextExporterID(t *testing.T) {
SetSimpleProcessorID(0)

var expected int64
for range 10 {
id := NextSimpleProcessorID()
assert.Equal(t, expected, id)
expected++
}
}

func TestSetExporterID(t *testing.T) {
SetSimpleProcessorID(0)

prev := SetSimpleProcessorID(42)
assert.Equal(t, int64(0), prev)

id := NextSimpleProcessorID()
assert.Equal(t, int64(42), id)
}

func TestNextExporterIDConcurrentSafe(t *testing.T) {
SetSimpleProcessorID(0)

const goroutines = 100
const increments = 10

var wg sync.WaitGroup
wg.Add(goroutines)

for range goroutines {
go func() {
defer wg.Done()
for range increments {
NextSimpleProcessorID()
}
}()
}

wg.Wait()

expected := int64(goroutines * increments)
id := NextSimpleProcessorID()
assert.Equal(t, expected, id)
}

type errMeterProvider struct {
mapi.MeterProvider
err error
Expand Down
24 changes: 20 additions & 4 deletions sdk/log/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"sync"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/log/internal/observ"
)

// Compile-time check SimpleProcessor implements Processor.
Expand All @@ -17,8 +20,8 @@ var _ Processor = (*SimpleProcessor)(nil)
type SimpleProcessor struct {
mu sync.Mutex
exporter Exporter

noCmp [0]func() //nolint: unused // This is indeed used.
inst *observ.SLP
noCmp [0]func() //nolint: unused // This is indeed used.
}

// NewSimpleProcessor is a simple Processor adapter.
Expand All @@ -30,7 +33,15 @@ type SimpleProcessor struct {
// [NewBatchProcessor] instead. However, there may be exceptions where certain
// [Exporter] implementations perform better with this Processor.
func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimpleProcessor {
return &SimpleProcessor{exporter: exporter}
slp := &SimpleProcessor{
exporter: exporter,
}
var err error
slp.inst, err = observ.NewSLP(observ.NextSimpleProcessorID())
if err != nil {
otel.Handle(err)
}
return slp
}

var simpleProcRecordsPool = sync.Pool{
Expand All @@ -41,7 +52,7 @@ var simpleProcRecordsPool = sync.Pool{
}

// OnEmit batches provided log record.
func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) (err error) {
if s.exporter == nil {
return nil
}
Expand All @@ -55,6 +66,11 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
simpleProcRecordsPool.Put(records)
}()

if s.inst != nil {
defer func() {
s.inst.LogProcessed(ctx, err)
}()
}
return s.exporter.Export(ctx, *records)
}

Expand Down
191 changes: 191 additions & 0 deletions sdk/log/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,25 @@ package log_test
import (
"context"
"io"
"strconv"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/log/internal/observ"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)

type exporter struct {
Expand All @@ -40,6 +51,17 @@ func (e *exporter) ForceFlush(context.Context) error {
return nil
}

var _ log.Exporter = (*failingTestExporter)(nil)

type failingTestExporter struct {
exporter
}

func (f *failingTestExporter) Export(ctx context.Context, r []log.Record) error {
_ = f.exporter.Export(ctx, r)
return assert.AnError
}

func TestSimpleProcessorOnEmit(t *testing.T) {
e := new(exporter)
s := log.NewSimpleProcessor(e)
Expand Down Expand Up @@ -138,3 +160,172 @@ func BenchmarkSimpleProcessorOnEmit(b *testing.B) {
_ = out
})
}

func BenchmarkSimpleProcessorObservability(b *testing.B) {
run := func(b *testing.B) {
slp := log.NewSimpleProcessor(&failingTestExporter{exporter: exporter{}})
record := new(log.Record)
record.SetSeverityText("test")

ctx := b.Context()
b.ReportAllocs()
b.ResetTimer()

var err error
for b.Loop() {
err = slp.OnEmit(ctx, record)
}
_ = err
}

b.Run("Observability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
run(b)
})
b.Run("NoObservability", run)
}

func TestSimpleLogProcessorObservability(t *testing.T) {
testcases := []struct {
name string
enabled bool
exporter log.Exporter
wantErr error
assertMetrics func(t *testing.T, rm metricdata.ResourceMetrics)
}{
{
name: "disabled",
enabled: false,
exporter: new(exporter),
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Empty(t, rm.ScopeMetrics)
},
},
{
name: "enabled",
enabled: true,
exporter: new(exporter),
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]

p := otelconv.SDKProcessorLogProcessed{}

want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: p.Name(),
Description: p.Description(),
Unit: p.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
observ.GetSLPComponentName(0),
semconv.OTelComponentTypeKey.String(
string(otelconv.ComponentTypeSimpleLogProcessor),
),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}

metricdatatest.AssertEqual(
t,
want,
sm,
metricdatatest.IgnoreExemplars(),
metricdatatest.IgnoreTimestamp(),
)
},
},
{
name: "Enable Exporter error",
enabled: true,
wantErr: assert.AnError,
exporter: &failingTestExporter{
exporter: exporter{},
},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]
p := otelconv.SDKProcessorLogProcessed{}

want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: "go.opentelemetry.io/otel/sdk/log/internal/observ",
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: p.Name(),
Description: p.Description(),
Unit: p.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
observ.GetSLPComponentName(0),
semconv.OTelComponentTypeKey.String(
string(otelconv.ComponentTypeSimpleLogProcessor),
),
semconv.ErrorTypeKey.String("*errors.errorString"),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}

metricdatatest.AssertEqual(
t,
want,
sm,
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
)
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", strconv.FormatBool(tc.enabled))

original := otel.GetMeterProvider()
t.Cleanup(func() {
otel.SetMeterProvider(original)
})

r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)

slp := log.NewSimpleProcessor(tc.exporter)
record := new(log.Record)
record.SetSeverityText("test")
err := slp.OnEmit(t.Context(), record)
require.ErrorIs(t, err, tc.wantErr)
var rm metricdata.ResourceMetrics
require.NoError(t, r.Collect(t.Context(), &rm))
tc.assertMetrics(t, rm)
observ.SetSimpleProcessorID(0)
})
}
}
Loading