Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `go.opentelemetry.io/otel/semconv/v1.36.0` package.
The package contains semantic conventions from the `v1.36.0` version of the OpenTelemetry Semantic Conventions.
See the [migration documentation](./semconv/v1.36.0/MIGRATION.md) for information on how to upgrade from `go.opentelemetry.io/otel/semconv/v1.34.0.`(#7032)
- Add experimental self-observability span metrics in `go.opentelemetry.io/otel/sdk/trace`.
Check the `go.opentelemetry.io/otel/sdk/trace/internal/x` package documentation for more information. (#7027)
- Add experimental self-observability span and batch span processor metrics in `go.opentelemetry.io/otel/sdk/trace`.
Check the `go.opentelemetry.io/otel/sdk/trace/internal/x` package documentation for more information. (#7027, #6393)
- Add native histogram exemplar support in `go.opentelemetry.io/otel/exporters/prometheus`. (#6772)

### Changed
Expand Down
87 changes: 86 additions & 1 deletion sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/internal/env"
"go.opentelemetry.io/otel/sdk/trace/internal/x"
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -26,6 +33,8 @@ const (
DefaultMaxExportBatchSize = 512
)

var queueFull = otelconv.ErrorTypeAttr("queue_full")

// BatchSpanProcessorOption configures a BatchSpanProcessor.
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)

Expand Down Expand Up @@ -69,6 +78,11 @@ type batchSpanProcessor struct {
queue chan ReadOnlySpan
dropped uint32

selfObservabilityEnabled bool
callbackRegistration metric.Registration
spansProcessedCounter otelconv.SDKProcessorSpanProcessed
componentNameAttr attribute.KeyValue

batch []ReadOnlySpan
batchMutex sync.Mutex
timer *time.Timer
Expand Down Expand Up @@ -110,6 +124,8 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
stopCh: make(chan struct{}),
}

bsp.configureSelfObservability()

bsp.stopWait.Add(1)
go func() {
defer bsp.stopWait.Done()
Expand All @@ -120,6 +136,55 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
return bsp
}

var processorIDCounter atomic.Int64

// nextProcessorID returns an identifier for this batch span processor,
// starting with 0 and incrementing by 1 each time it is called.
func nextProcessorID() int64 {
return processorIDCounter.Add(1) - 1
}

// configureSelfObservability configures metrics for the batch span processor.
func (bsp *batchSpanProcessor) configureSelfObservability() {
if !x.SelfObservability.Enabled() {
return
}
bsp.selfObservabilityEnabled = true
bsp.componentNameAttr = semconv.OTelComponentName(
fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, nextProcessorID()))
meter := otel.GetMeterProvider().Meter(
selfObsScopeName,
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)

queueCapacityUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueCapacity(meter)
if err != nil {
otel.Handle(err)
}
queueSizeUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueSize(meter)
if err != nil {
otel.Handle(err)
}
bsp.spansProcessedCounter, err = otelconv.NewSDKProcessorSpanProcessed(meter)
if err != nil {
otel.Handle(err)
}

callabckAttributesOpt := metric.WithAttributes(bsp.componentNameAttr,
semconv.OTelComponentTypeBatchingSpanProcessor)
bsp.callbackRegistration, err = meter.RegisterCallback(
func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(queueSizeUpDownCounter.Inst(), int64(len(bsp.queue)), callabckAttributesOpt)
o.ObserveInt64(queueCapacityUpDownCounter.Inst(), int64(bsp.o.MaxQueueSize), callabckAttributesOpt)
return nil
},
queueSizeUpDownCounter.Inst(), queueCapacityUpDownCounter.Inst())
if err != nil {
otel.Handle(err)
}
}

// OnStart method does nothing.
func (bsp *batchSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}

Expand Down Expand Up @@ -160,6 +225,9 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
case <-ctx.Done():
err = ctx.Err()
}
if bsp.selfObservabilityEnabled {
err = errors.Join(err, bsp.callbackRegistration.Unregister())
}
})
return err
}
Expand Down Expand Up @@ -272,6 +340,11 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {

if l := len(bsp.batch); l > 0 {
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
if bsp.selfObservabilityEnabled {
bsp.spansProcessedCounter.Add(ctx, int64(l),
bsp.componentNameAttr,
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor))
}
err := bsp.e.ExportSpans(ctx, bsp.batch)

// A new batch is always created after exporting, even if the batch failed to be exported.
Expand Down Expand Up @@ -380,11 +453,17 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
case bsp.queue <- sd:
return true
case <-ctx.Done():
if bsp.selfObservabilityEnabled {
bsp.spansProcessedCounter.Add(ctx, 1,
bsp.componentNameAttr,
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
bsp.spansProcessedCounter.AttrErrorType(queueFull))
}
return false
}
}

func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) bool {
func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool {
if !sd.SpanContext().IsSampled() {
return false
}
Expand All @@ -394,6 +473,12 @@ func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) b
return true
default:
atomic.AddUint32(&bsp.dropped, 1)
if bsp.selfObservabilityEnabled {
bsp.spansProcessedCounter.Add(ctx, 1,
bsp.componentNameAttr,
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
bsp.spansProcessedCounter.AttrErrorType(queueFull))
}
}
return false
}
Expand Down
Loading
Loading