Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 53 additions & 118 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package trace_test
package trace

import (
"context"
Expand All @@ -12,22 +12,17 @@ import (
"testing"
"time"

"github.com/go-logr/logr"
"github.com/go-logr/logr/funcr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/internal/env"
ottest "go.opentelemetry.io/otel/sdk/internal/internaltest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
)

type testBatchExporter struct {
mu sync.Mutex
spans []sdktrace.ReadOnlySpan
spans []ReadOnlySpan
sizes []int
batchCount int
shutdownCount int
Expand All @@ -37,7 +32,7 @@ type testBatchExporter struct {
err error
}

func (t *testBatchExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
func (t *testBatchExporter) ExportSpans(ctx context.Context, spans []ReadOnlySpan) error {
t.mu.Lock()
defer t.mu.Unlock()

Expand Down Expand Up @@ -78,20 +73,20 @@ func (t *testBatchExporter) getBatchCount() int {
return t.batchCount
}

var _ sdktrace.SpanExporter = (*testBatchExporter)(nil)
var _ SpanExporter = (*testBatchExporter)(nil)

func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
tp := basicTracerProvider(t)
bsp := sdktrace.NewBatchSpanProcessor(nil)
bsp := NewBatchSpanProcessor(nil)
tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer("NilExporter")

_, span := tr.Start(context.Background(), "foo")
span.End()

// These should not panic.
bsp.OnStart(context.Background(), span.(sdktrace.ReadWriteSpan))
bsp.OnEnd(span.(sdktrace.ReadOnlySpan))
bsp.OnStart(context.Background(), span.(ReadWriteSpan))
bsp.OnEnd(span.(ReadOnlySpan))
if err := bsp.ForceFlush(context.Background()); err != nil {
t.Errorf("failed to ForceFlush the BatchSpanProcessor: %v", err)
}
Expand All @@ -102,7 +97,7 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {

type testOption struct {
name string
o []sdktrace.BatchSpanProcessorOption
o []BatchSpanProcessorOption
wantNumSpans int
wantBatchCount int
genNumSpans int
Expand All @@ -121,50 +116,50 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
},
{
name: "non-default BatchTimeout",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(schDelay),
o: []BatchSpanProcessorOption{
WithBatchTimeout(schDelay),
},
wantNumSpans: 2053,
wantBatchCount: 4,
genNumSpans: 2053,
},
{
name: "non-default MaxQueueSize and BatchTimeout",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(schDelay),
sdktrace.WithMaxQueueSize(200),
o: []BatchSpanProcessorOption{
WithBatchTimeout(schDelay),
WithMaxQueueSize(200),
},
wantNumSpans: 205,
wantBatchCount: 1,
genNumSpans: 205,
},
{
name: "non-default MaxQueueSize, BatchTimeout and MaxExportBatchSize",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(schDelay),
sdktrace.WithMaxQueueSize(205),
sdktrace.WithMaxExportBatchSize(20),
o: []BatchSpanProcessorOption{
WithBatchTimeout(schDelay),
WithMaxQueueSize(205),
WithMaxExportBatchSize(20),
},
wantNumSpans: 210,
wantBatchCount: 11,
genNumSpans: 210,
},
{
name: "blocking option",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(schDelay),
sdktrace.WithMaxQueueSize(200),
sdktrace.WithMaxExportBatchSize(20),
o: []BatchSpanProcessorOption{
WithBatchTimeout(schDelay),
WithMaxQueueSize(200),
WithMaxExportBatchSize(20),
},
wantNumSpans: 205,
wantBatchCount: 11,
genNumSpans: 205,
},
{
name: "parallel span generation",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(schDelay),
sdktrace.WithMaxQueueSize(200),
o: []BatchSpanProcessorOption{
WithBatchTimeout(schDelay),
WithMaxQueueSize(200),
},
wantNumSpans: 205,
wantBatchCount: 1,
Expand All @@ -173,9 +168,9 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
},
{
name: "parallel span blocking",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(schDelay),
sdktrace.WithMaxExportBatchSize(200),
o: []BatchSpanProcessorOption{
WithBatchTimeout(schDelay),
WithMaxExportBatchSize(200),
},
wantNumSpans: 2000,
wantBatchCount: 10,
Expand Down Expand Up @@ -306,19 +301,19 @@ type stuckExporter struct {
}

// ExportSpans waits for ctx to expire and returns that error.
func (e *stuckExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error {
func (e *stuckExporter) ExportSpans(ctx context.Context, _ []ReadOnlySpan) error {
<-ctx.Done()
e.err = ctx.Err()
return ctx.Err()
}

func TestBatchSpanProcessorExportTimeout(t *testing.T) {
exp := new(stuckExporter)
bsp := sdktrace.NewBatchSpanProcessor(
bsp := NewBatchSpanProcessor(
exp,
// Set a non-zero export timeout so a deadline is set.
sdktrace.WithExportTimeout(1*time.Microsecond),
sdktrace.WithBlocking(),
WithExportTimeout(1*time.Microsecond),
WithBlocking(),
)
tp := basicTracerProvider(t)
tp.RegisterSpanProcessor(bsp)
Expand All @@ -332,10 +327,10 @@ func TestBatchSpanProcessorExportTimeout(t *testing.T) {
}
}

func createAndRegisterBatchSP(option testOption, te *testBatchExporter) sdktrace.SpanProcessor {
func createAndRegisterBatchSP(option testOption, te *testBatchExporter) SpanProcessor {
// Always use blocking queue to avoid flaky tests.
options := append(option.o, sdktrace.WithBlocking())
return sdktrace.NewBatchSpanProcessor(te, options...)
options := append(option.o, WithBlocking())
return NewBatchSpanProcessor(te, options...)
}

func generateSpan(_ *testing.T, tr trace.Tracer, option testOption) {
Expand Down Expand Up @@ -382,7 +377,7 @@ func getSpanContext() trace.SpanContext {

func TestBatchSpanProcessorShutdown(t *testing.T) {
var bp testBatchExporter
bsp := sdktrace.NewBatchSpanProcessor(&bp)
bsp := NewBatchSpanProcessor(&bp)

err := bsp.Shutdown(context.Background())
if err != nil {
Expand All @@ -401,14 +396,14 @@ func TestBatchSpanProcessorShutdown(t *testing.T) {
func TestBatchSpanProcessorPostShutdown(t *testing.T) {
tp := basicTracerProvider(t)
be := testBatchExporter{}
bsp := sdktrace.NewBatchSpanProcessor(&be)
bsp := NewBatchSpanProcessor(&be)

tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer("Normal")

generateSpanParallel(t, tr, testOption{
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithMaxExportBatchSize(50),
o: []BatchSpanProcessorOption{
WithMaxExportBatchSize(50),
},
genNumSpans: 60,
})
Expand All @@ -428,9 +423,9 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
tp := basicTracerProvider(t)
option := testOption{
name: "default BatchSpanProcessorOptions",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithMaxQueueSize(0),
sdktrace.WithMaxExportBatchSize(3000),
o: []BatchSpanProcessorOption{
WithMaxQueueSize(0),
WithMaxExportBatchSize(3000),
},
wantNumSpans: 2053,
wantBatchCount: 1,
Expand Down Expand Up @@ -468,9 +463,9 @@ func TestBatchSpanProcessorDropBatchIfFailed(t *testing.T) {
}
tp := basicTracerProvider(t)
option := testOption{
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithMaxQueueSize(0),
sdktrace.WithMaxExportBatchSize(2000),
o: []BatchSpanProcessorOption{
WithMaxQueueSize(0),
WithMaxExportBatchSize(2000),
},
wantNumSpans: 1000,
wantBatchCount: 1,
Expand Down Expand Up @@ -545,7 +540,7 @@ func (e indefiniteExporter) Shutdown(context.Context) error {
return nil
}

func (e indefiniteExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error {
func (e indefiniteExporter) ExportSpans(ctx context.Context, _ []ReadOnlySpan) error {
<-e.stop
return ctx.Err()
}
Expand All @@ -555,7 +550,7 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
// Cancel the context
cancel()

bsp := sdktrace.NewBatchSpanProcessor(newIndefiniteExporter(t))
bsp := NewBatchSpanProcessor(newIndefiniteExporter(t))
t.Cleanup(func() {
assert.NoError(t, bsp.Shutdown(context.Background()))
})
Expand All @@ -568,7 +563,7 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
tp := basicTracerProvider(t)
exp := newIndefiniteExporter(t)
bsp := sdktrace.NewBatchSpanProcessor(exp)
bsp := NewBatchSpanProcessor(exp)
tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer(t.Name())
_, span := tr.Start(context.Background(), "foo")
Expand All @@ -586,11 +581,10 @@ func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
ctx := context.Background()

exp := tracetest.NewInMemoryExporter()

tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
)
var bp testBatchExporter
bsp := NewBatchSpanProcessor(&bp)
tp := basicTracerProvider(t)
tp.RegisterSpanProcessor(bsp)
t.Cleanup(func() {
assert.NoError(t, tp.Shutdown(context.Background()))
})
Expand All @@ -604,14 +598,14 @@ func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
err := tp.ForceFlush(ctx)
assert.NoError(t, err)

assert.Len(t, exp.GetSpans(), i+1)
assert.Len(t, bp.spans, i+1)
}
}

func TestBatchSpanProcessorConcurrentSafe(t *testing.T) {
ctx := context.Background()
var bp testBatchExporter
bsp := sdktrace.NewBatchSpanProcessor(&bp)
bsp := NewBatchSpanProcessor(&bp)
tp := basicTracerProvider(t)
tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer(t.Name())
Expand Down Expand Up @@ -650,62 +644,3 @@ func TestBatchSpanProcessorConcurrentSafe(t *testing.T) {

wg.Wait()
}

func BenchmarkSpanProcessorOnEnd(b *testing.B) {
for _, bb := range []struct {
batchSize int
spansCount int
}{
{batchSize: 10, spansCount: 10},
{batchSize: 10, spansCount: 100},
{batchSize: 100, spansCount: 10},
{batchSize: 100, spansCount: 100},
} {
b.Run(fmt.Sprintf("batch: %d, spans: %d", bb.batchSize, bb.spansCount), func(b *testing.B) {
bsp := sdktrace.NewBatchSpanProcessor(
tracetest.NewNoopExporter(),
sdktrace.WithMaxExportBatchSize(bb.batchSize),
)
b.Cleanup(func() {
_ = bsp.Shutdown(context.Background())
})
snap := tracetest.SpanStub{}.Snapshot()

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
// Ensure the export happens for every run
for j := 0; j < bb.spansCount; j++ {
bsp.OnEnd(snap)
}
}
})
}
}

func BenchmarkSpanProcessorVerboseLogging(b *testing.B) {
b.Cleanup(func(l logr.Logger) func() {
return func() { global.SetLogger(l) }
}(global.GetLogger()))
global.SetLogger(funcr.New(func(prefix, args string) {}, funcr.Options{Verbosity: 5}))
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(
tracetest.NewNoopExporter(),
sdktrace.WithMaxExportBatchSize(10),
))
b.Cleanup(func() {
_ = tp.Shutdown(context.Background())
})
tracer := tp.Tracer("bench")
ctx := context.Background()

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
for j := 0; j < 10; j++ {
_, span := tracer.Start(ctx, "bench")
span.End()
}
}
}
Loading
Loading