Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `go.opentelemetry.io/otel/semconv/v1.36.0` package.
The package contains semantic conventions from the `v1.36.0` version of the OpenTelemetry Semantic Conventions.
See the [migration documentation](./semconv/v1.36.0/MIGRATION.md) for information on how to upgrade from `go.opentelemetry.io/otel/semconv/v1.34.0.`(#7032)
- Add experimental self-observability span metrics in `go.opentelemetry.io/otel/sdk/trace`.
Check the `go.opentelemetry.io/otel/sdk/trace/internal/x` package documentation for more information. (#7027)
- Add experimental self-observability span and batch span processor metrics in `go.opentelemetry.io/otel/sdk/trace`.
Check the `go.opentelemetry.io/otel/sdk/trace/internal/x` package documentation for more information. (#7027, #6393)
- Add native histogram exemplar support in `go.opentelemetry.io/otel/exporters/prometheus`. (#6772)
- Add experimental self-observability log metrics in `go.opentelemetry.io/otel/sdk/log`.
Check the `go.opentelemetry.io/otel/sdk/log/internal/x` package documentation for more information. (#7121)
Expand Down
87 changes: 86 additions & 1 deletion sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/internal/env"
"go.opentelemetry.io/otel/sdk/trace/internal/x"
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -26,6 +33,8 @@ const (
DefaultMaxExportBatchSize = 512
)

var queueFull = otelconv.ErrorTypeAttr("queue_full")

// BatchSpanProcessorOption configures a BatchSpanProcessor.
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)

Expand Down Expand Up @@ -69,6 +78,11 @@ type batchSpanProcessor struct {
queue chan ReadOnlySpan
dropped uint32

selfObservabilityEnabled bool
callbackRegistration metric.Registration
spansProcessedCounter otelconv.SDKProcessorSpanProcessed
componentNameAttr attribute.KeyValue

batch []ReadOnlySpan
batchMutex sync.Mutex
timer *time.Timer
Expand Down Expand Up @@ -110,6 +124,8 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
stopCh: make(chan struct{}),
}

bsp.configureSelfObservability()

bsp.stopWait.Add(1)
go func() {
defer bsp.stopWait.Done()
Expand All @@ -120,6 +136,55 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
return bsp
}

var processorIDCounter atomic.Int64

// nextProcessorID returns an identifier for this batch span processor,
// starting with 0 and incrementing by 1 each time it is called.
func nextProcessorID() int64 {
return processorIDCounter.Add(1) - 1
}

// configureSelfObservability configures metrics for the batch span processor.
func (bsp *batchSpanProcessor) configureSelfObservability() {
if !x.SelfObservability.Enabled() {
return
}
bsp.selfObservabilityEnabled = true
bsp.componentNameAttr = semconv.OTelComponentName(
fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, nextProcessorID()))
meter := otel.GetMeterProvider().Meter(
selfObsScopeName,
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)

queueCapacityUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueCapacity(meter)
if err != nil {
otel.Handle(err)
}
queueSizeUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueSize(meter)
if err != nil {
otel.Handle(err)
}
bsp.spansProcessedCounter, err = otelconv.NewSDKProcessorSpanProcessed(meter)
if err != nil {
otel.Handle(err)
}

callabckAttributesOpt := metric.WithAttributes(bsp.componentNameAttr,
semconv.OTelComponentTypeBatchingSpanProcessor)
bsp.callbackRegistration, err = meter.RegisterCallback(
func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(queueSizeUpDownCounter.Inst(), int64(len(bsp.queue)), callabckAttributesOpt)
o.ObserveInt64(queueCapacityUpDownCounter.Inst(), int64(bsp.o.MaxQueueSize), callabckAttributesOpt)
return nil
},
queueSizeUpDownCounter.Inst(), queueCapacityUpDownCounter.Inst())
if err != nil {
otel.Handle(err)
}
}

// OnStart method does nothing.
func (*batchSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}

Expand Down Expand Up @@ -160,6 +225,9 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
case <-ctx.Done():
err = ctx.Err()
}
if bsp.selfObservabilityEnabled {
err = errors.Join(err, bsp.callbackRegistration.Unregister())
}
})
return err
}
Expand Down Expand Up @@ -272,6 +340,11 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {

if l := len(bsp.batch); l > 0 {
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
if bsp.selfObservabilityEnabled {
bsp.spansProcessedCounter.Add(ctx, int64(l),
bsp.componentNameAttr,
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor))
}
err := bsp.e.ExportSpans(ctx, bsp.batch)

// A new batch is always created after exporting, even if the batch failed to be exported.
Expand Down Expand Up @@ -380,11 +453,17 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
case bsp.queue <- sd:
return true
case <-ctx.Done():
if bsp.selfObservabilityEnabled {
bsp.spansProcessedCounter.Add(ctx, 1,
bsp.componentNameAttr,
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
bsp.spansProcessedCounter.AttrErrorType(queueFull))
}
return false
}
}

func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) bool {
func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool {
if !sd.SpanContext().IsSampled() {
return false
}
Expand All @@ -394,6 +473,12 @@ func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) b
return true
default:
atomic.AddUint32(&bsp.dropped, 1)
if bsp.selfObservabilityEnabled {
bsp.spansProcessedCounter.Add(ctx, 1,
bsp.componentNameAttr,
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
bsp.spansProcessedCounter.AttrErrorType(queueFull))
}
}
return false
}
Expand Down
Loading
Loading