diff --git a/.chloggen/arc-feature.yaml b/.chloggen/arc-feature.yaml new file mode 100644 index 00000000000..5b7e8dee0ee --- /dev/null +++ b/.chloggen/arc-feature.yaml @@ -0,0 +1,32 @@ +change_type: enhancement +component: pkg/exporterhelper +note: > + Add Adaptive Request Concurrency (ARC) to exporterhelper: a controller that + dynamically adjusts exporter concurrency based on observed RTT and + backpressure signals. Integrates with the queue/batch send path and exposes + new telemetry. Disabled by default; no behavior change unless enabled. +issues: [14080] +subtext: | + Configuration (exporterhelper / queue-batch settings): + - enabled: turn ARC on/off (default: false) + - initial_concurrency: starting permits (default: 1) + - max_concurrency: upper bound (default: 200) + - decrease_ratio: multiplicative decrease factor on pressure (default: 0.9) + - ewma_alpha: smoothing for RTT EWMA (default: 0.4) + - rtt_deviation_scale: N·σ spike threshold for decrease (default: 2.5) + + Telemetry (new metrics): + - otelcol_exporter_arc_acquire_wait_ms (histogram; attrs: exporter, data_type) + - otelcol_exporter_arc_rtt_ms (histogram; attrs: exporter, data_type) + - otelcol_exporter_arc_failures (sum, monotonic; attrs: exporter, data_type) + - otelcol_exporter_arc_backoff_events (sum, monotonic; attrs: exporter, data_type) + - otelcol_exporter_arc_limit_changes (sum, monotonic; attrs: exporter, data_type, direction=up|down) + - otelcol_exporter_arc_limit (async gauge; attrs: exporter, data_type) + - otelcol_exporter_arc_permits_in_use (async gauge; attrs: exporter, data_type) + + Notes: + - ARC increases concurrency additively and decreases multiplicatively on + backpressure or RTT spikes (mean + rtt_deviation_scale·σ). + - Direction attribute values: "up", "down". +change_logs: [user] + diff --git a/exporter/exporterhelper/README.md b/exporter/exporterhelper/README.md index 4652383c6a2..ef8e0abebae 100644 --- a/exporter/exporterhelper/README.md +++ b/exporter/exporterhelper/README.md @@ -29,6 +29,7 @@ The following configuration options can be modified: - `bytes`: the size of serialized data in bytes (the least performant option). - `queue_size` (default = 1000): Maximum size the queue can accept. Measured in units defined by `sizer` - `batch`: see below. + - `arc`: see below. #### Sending queue batch settings @@ -50,6 +51,19 @@ Available `batch::sizer` options: - `items`: number of the smallest parts of each signal (spans, metric data points, log records); - `bytes`: the size of serialized data in bytes (the least performant option). + +#### Sending queue Adaptive Concurrency Limiter (ARC) Settings + +The Adaptive Concurrency Limiter (ARC) dynamically adjusts the number of concurrent requests (`num_consumers`) based on observed RTTs and backpressure signals. It aims to maximize throughput while minimizing errors and latency. It is disabled by default. + +- `arc` + - `enabled` (default = false): Set to `true` to enable ARC. + - `initial_limit` (default = 1): The starting concurrency limit. + - `max_concurrency` (default = 200): The maximum number of concurrent requests ARC will allow. + - `decrease_ratio` (default = 0.9): The multiplicative factor to apply when decreasing the limit (e.g., 0.9 = 10% decrease). + - `ewma_alpha` (default = 0.4): The smoothing factor for the EWMA (Exponentially Weighted Moving Average) of RTTs. + - `deviation_scale` (default = 2.5): The number of standard deviations from the mean RTT to tolerate before triggering a backoff. + ### Timeout - `timeout` (default = 5s): Time to wait per individual attempt to send data to a backend diff --git a/exporter/exporterhelper/documentation.md b/exporter/exporterhelper/documentation.md index 095309bed21..200ed6ad869 100644 --- a/exporter/exporterhelper/documentation.md +++ b/exporter/exporterhelper/documentation.md @@ -6,6 +6,68 @@ The following telemetry is emitted by this component. +### otelcol_exporter_arc_acquire_wait_ms + +Time a worker waited to acquire an ARC permit. [Alpha] + +| Unit | Metric Type | Value Type | Stability | +| ---- | ----------- | ---------- | --------- | +| ms | Histogram | Int | Alpha | + +### otelcol_exporter_arc_backoff_events + +Number of ARC backoff (shrink) events triggered by error or RTT signal. [Alpha] + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {events} | Sum | Int | true | Alpha | + +### otelcol_exporter_arc_failures + +Number of requests considered failures by ARC (feeds adaptive shrink). [Alpha] + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {requests} | Sum | Int | true | Alpha | + +### otelcol_exporter_arc_limit + +Current ARC dynamic concurrency limit. [Alpha] + +| Unit | Metric Type | Value Type | Stability | +| ---- | ----------- | ---------- | --------- | +| {permits} | Gauge | Int | Alpha | + +### otelcol_exporter_arc_limit_changes + +Number of times ARC changed its concurrency limit. [Alpha] + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {events} | Sum | Int | true | Alpha | + +#### Attributes + +| Name | Description | Values | +| ---- | ----------- | ------ | +| direction | up or down | Str: ``up``, ``down`` | + +### otelcol_exporter_arc_permits_in_use + +Number of permits currently acquired. [Alpha] + +| Unit | Metric Type | Value Type | Stability | +| ---- | ----------- | ---------- | --------- | +| {permits} | Gauge | Int | Alpha | + +### otelcol_exporter_arc_rtt_ms + +Request round-trip-time measured by ARC (from permit acquire to release). [Alpha] + +| Unit | Metric Type | Value Type | Stability | +| ---- | ----------- | ---------- | --------- | +| ms | Histogram | Int | Alpha | + ### otelcol_exporter_enqueue_failed_log_records Number of log records failed to be added to the sending queue. [Alpha] diff --git a/exporter/exporterhelper/internal/arc/controller.go b/exporter/exporterhelper/internal/arc/controller.go new file mode 100644 index 00000000000..96378671f39 --- /dev/null +++ b/exporter/exporterhelper/internal/arc/controller.go @@ -0,0 +1,331 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package arc // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/arc" + +import ( + "context" + "math" + "sync" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" + "go.opentelemetry.io/collector/pipeline" +) + +// Controller coordinates ARC. Internally it uses a control law and a TokenPool gate. +type Controller struct { + cfg Config + pool *TokenPool + + // Telemetry builder (generated by metadata) and instruments. + tel *metadata.TelemetryBuilder + rttInst metric.Int64Histogram + failuresInst metric.Int64Counter + limitChangesInst metric.Int64Counter + limitUpAttrs metric.MeasurementOption + limitDownAttrs metric.MeasurementOption + backoffInst metric.Int64Counter + syncAttrs metric.MeasurementOption + + mu sync.Mutex + st struct { + limit int + inFlight int + credits int // ceiling-hit credits (for additive increase) + pressure bool // any explicit pressure in period + periodStart time.Time + periodDur time.Duration + reMean robustEWMA // robust mean + absdev estimator for RTT + prevRTTMean float64 + prevRTTDev float64 + lastRTTMean float64 + lastRTTDev float64 + } +} + +// NewController creates a new ARC Controller. +func NewController(cfg Config, tel *metadata.TelemetryBuilder, id component.ID, signal pipeline.Signal) *Controller { + def := DefaultConfig() + if cfg.InitialLimit <= 0 { + cfg.InitialLimit = 1 + } + if cfg.MaxConcurrency <= 0 { + cfg.MaxConcurrency = def.MaxConcurrency + } + if cfg.InitialLimit > cfg.MaxConcurrency { + cfg.InitialLimit = cfg.MaxConcurrency + } + if cfg.DecreaseRatio <= 0 || cfg.DecreaseRatio >= 1 { + cfg.DecreaseRatio = def.DecreaseRatio + } + if cfg.EwmaAlpha <= 0 || cfg.EwmaAlpha >= 1 { + cfg.EwmaAlpha = def.EwmaAlpha + } + if cfg.DeviationScale < 0 { + cfg.DeviationScale = def.DeviationScale + } + + // Attributes used on emitted metrics. + exporterAttr := attribute.String("exporter", id.String()) + dataTypeAttr := attribute.String("data_type", signal.String()) + syncAttrs := metric.WithAttributeSet(attribute.NewSet(exporterAttr, dataTypeAttr)) + limitUpAttrs := metric.WithAttributeSet(attribute.NewSet( + exporterAttr, dataTypeAttr, attribute.String("direction", "up"), + )) + limitDownAttrs := metric.WithAttributeSet(attribute.NewSet( + exporterAttr, dataTypeAttr, attribute.String("direction", "down"), + )) + + c := &Controller{ + cfg: cfg, + pool: newTokenPool(cfg.InitialLimit), + tel: tel, + rttInst: tel.ExporterArcRttMs, + failuresInst: tel.ExporterArcFailures, + limitChangesInst: tel.ExporterArcLimitChanges, + limitUpAttrs: limitUpAttrs, + limitDownAttrs: limitDownAttrs, + backoffInst: tel.ExporterArcBackoffEvents, + syncAttrs: syncAttrs, + } + + c.st.limit = cfg.InitialLimit + c.st.reMean = newRobustEWMA(cfg.EwmaAlpha) + c.st.periodStart = time.Now() + c.st.periodDur = clampDur(minPeriod, maxPeriod, 300*time.Millisecond) // initial period + + // Register async gauges. + _ = tel.RegisterExporterArcLimitCallback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(int64(c.CurrentLimit()), syncAttrs) + return nil + }) + _ = tel.RegisterExporterArcPermitsInUseCallback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(int64(c.PermitsInUse()), syncAttrs) + return nil + }) + + return c +} + +// controlStep applies the ARC control law at the end of a period. +// Caller must hold c.mu. +func (c *Controller) controlStep(ctx context.Context) { + limitBefore := c.st.limit + + // If we don't have a prior baseline yet, treat RTT spike detection as disabled + // (no isSpike) but still allow explicit-pressure decreases in cold start. + hasBaseline := c.st.prevRTTMean > 0 + threshold := c.st.prevRTTMean + c.cfg.DeviationScale*c.st.prevRTTDev + isSpike := hasBaseline && c.st.reMean.initialized() && c.st.lastRTTMean > threshold + + // Decrease on explicit pressure or RTT spike. + if c.st.limit > 1 && (c.st.pressure || isSpike) { + newLimit := int(math.Floor(float64(c.st.limit) * c.cfg.DecreaseRatio)) + if newLimit < 1 { + newLimit = 1 + } + if dec := c.st.limit - newLimit; dec > 0 { + c.pool.Shrink(dec) + c.st.limit = newLimit + if c.tel != nil { + if c.backoffInst != nil { + c.backoffInst.Add(contextOrBG(ctx), 1, c.syncAttrs) + } + if c.limitChangesInst != nil && c.st.limit != limitBefore { + c.limitChangesInst.Add(contextOrBG(ctx), 1, c.limitDownAttrs) + } + } + c.st.credits = 0 + c.st.pressure = false + return + } + } + + // Additive increase is *disabled* until we have a prior baseline (first roll). + // This avoids a spurious +1 when tests "poke" the controller to roll the period. + canIncreaseRTT := !c.st.reMean.initialized() || c.st.lastRTTMean <= threshold + if hasBaseline && // <- require baseline to allow additive increase + c.st.limit < c.cfg.MaxConcurrency && + !c.st.pressure && + c.st.credits >= requiredCredits(c.st.limit) && + canIncreaseRTT { + c.pool.Grow(1) + c.st.limit++ + if c.tel != nil && c.limitChangesInst != nil && c.st.limit != limitBefore { + c.limitChangesInst.Add(contextOrBG(ctx), 1, c.limitUpAttrs) + } + c.st.credits = 0 // reset after successful increase + return + } + + // Modest credit decay if no change happened. + c.st.credits = int(float64(c.st.credits) * 0.5) + c.st.pressure = false +} + +func (c *Controller) Shutdown() { + if c == nil { + return + } + if c.tel != nil { + c.tel.Shutdown() + } + if c.pool != nil { + c.pool.Close() + } +} + +// Acquire obtains a permit unless ARC is disabled or the context is cancelled. +func (c *Controller) Acquire(ctx context.Context) bool { + // Fast-path bypass: if ARC is disabled, do not gate. + if !c.cfg.Enabled { + return true + } + // TokenPool.Acquire returns an error; return true when no error (permit obtained). + return c.pool.Acquire(ctx) == nil +} + +// Release returns a permit. +func (c *Controller) Release() { c.pool.Release() } + +// ReleaseWithSample reports a sample (RTT + classification) and releases. +func (c *Controller) ReleaseWithSample(ctx context.Context, rtt time.Duration, success, backpressure bool) { + c.Feedback(ctx, rtt, success, backpressure) +} + +// StartRequest increments in-flight accounting and emits "in use" telemetry. +// Keep this as a no-op when disabled. +func (c *Controller) StartRequest() { + if !c.cfg.Enabled { + return + } + + c.mu.Lock() + // ===== existing in-flight++ and "reachedLimit" logic ===== + c.st.inFlight++ + if c.st.inFlight >= c.st.limit { + // record a ceiling hit as a credit for additive increase logic + c.st.credits++ + } + // Emit "permits in use" observable via tel, if you have it + // (Leave as-is if already wired; this is illustrative.) + // c.tel.RecordExporterArcPermitsInUse(int64(c.st.inFlight), c.attrs...) + c.mu.Unlock() +} + +func (c *Controller) CurrentLimit() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.st.limit +} + +func (c *Controller) PermitsInUse() int { + c.pool.mu.Lock() + defer c.pool.mu.Unlock() + return c.pool.inUse +} + +// Feedback reports a sample (RTT + classification) and releases the permit. +// It must be called exactly once for every StartRequest that has a corresponding Acquire=true. +func (c *Controller) Feedback(ctx context.Context, rtt time.Duration, success, isBackpressure bool) { + // Ensure the permit is always released. + if c.cfg.Enabled { + // Release via the pool to match other Release implementations. + c.pool.Release() + } + + // No controller updates when disabled. + if !c.cfg.Enabled { + return + } + + // RTT is provided by the caller. + c.mu.Lock() + defer c.mu.Unlock() + + // ===== existing in-flight-- ===== + if c.st.inFlight > 0 { + c.st.inFlight-- + } + + // ===== robust-EWMA update with rtt when success ===== + if success { + // Update EWMA using RTT in milliseconds as a float64 + ms := float64(rtt.Milliseconds()) + c.st.lastRTTMean, c.st.lastRTTDev = c.st.reMean.update(ms) + if c.rttInst != nil { + c.rttInst.Record(contextOrBG(ctx), rtt.Milliseconds(), c.syncAttrs) + } + } + + // Backpressure and failure accounting + if isBackpressure { + c.st.pressure = true + if c.failuresInst != nil { + c.failuresInst.Add(contextOrBG(ctx), 1, c.syncAttrs) + } + } else if !success { + // Non-backpressure failures are also counted. + if c.failuresInst != nil { + c.failuresInst.Add(contextOrBG(ctx), 1, c.syncAttrs) + } + } + + // ===== periodic control step / AIMD ===== + now := time.Now() + if now.Sub(c.st.periodStart) > c.st.periodDur { + // Call the controlStep + c.controlStep(ctx) + + // Update prev RTT stats *after* controlStep uses them + c.st.prevRTTMean = c.st.lastRTTMean + c.st.prevRTTDev = c.st.lastRTTDev + + // Reset for next period + c.st.periodStart = now + + // Recompute next control interval from observed RTT, clamped + if c.st.reMean.initialized() { + // Use mean RTT + 1 deviation as the period duration + newDur := time.Duration(c.st.lastRTTMean+c.st.lastRTTDev) * time.Millisecond + c.st.periodDur = clampDur(minPeriod, maxPeriod, newDur) + } + } +} + +// requiredCredits tunes how aggressively we add permits. +func requiredCredits(limit int) int { + if limit <= 1 { + return 1 + } + if limit < 8 { + return 2 + } + if limit < 32 { + return 3 + } + return 4 +} + +func clampDur(minDur, maxDur, v time.Duration) time.Duration { + if v < minDur { + return minDur + } + if v > maxDur { + return maxDur + } + return v +} + +// contextOrBG returns the provided context or context.Background() when nil. +func contextOrBG(ctx context.Context) context.Context { + if ctx != nil { + return ctx + } + return context.Background() +} diff --git a/exporter/exporterhelper/internal/arc/controller_test.go b/exporter/exporterhelper/internal/arc/controller_test.go new file mode 100644 index 00000000000..c786e21b490 --- /dev/null +++ b/exporter/exporterhelper/internal/arc/controller_test.go @@ -0,0 +1,378 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package arc + +import ( + "context" + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" + "go.opentelemetry.io/collector/pipeline" +) + +// newTestController creates a controller with NOP telemetry. +func newTestController(t *testing.T, cfg Config) *Controller { + set := componenttest.NewNopTelemetrySettings() + tel, err := metadata.NewTelemetryBuilder(set) + require.NoError(t, err) + return NewController(cfg, tel, component.MustNewID("test"), pipeline.SignalTraces) +} + +// forceControlStep triggers a feedback loop that is guaranteed to be after the period duration. +func forceControlStep(c *Controller) { + // Wait past the period and poke again to trigger controlStep + time.Sleep(c.st.periodDur + 1*time.Millisecond) + c.StartRequest() + c.ReleaseWithSample(context.Background(), time.Duration(c.st.lastRTTMean*float64(time.Millisecond)), true, false) +} + +func TestController_NewShutdown(t *testing.T) { + cfg := DefaultConfig() + cfg.Enabled = true + c := newTestController(t, cfg) + require.NotNil(t, c) + assert.Equal(t, cfg.InitialLimit, c.CurrentLimit()) + c.Shutdown() +} + +func TestController_ConfigClamping(t *testing.T) { + cfg := DefaultConfig() + cfg.Enabled = true + // Intentionally bad values that should be clamped to defaults + cfg.InitialLimit = -5 + cfg.MaxConcurrency = 0 + cfg.DecreaseRatio = 1.5 + cfg.EwmaAlpha = -1.0 + cfg.DeviationScale = -2.0 + + c := newTestController(t, cfg) + require.NotNil(t, c) + + // NewController should have clamped these per DefaultConfig()+rules + def := DefaultConfig() + assert.GreaterOrEqual(t, c.cfg.InitialLimit, 1) + assert.Equal(t, def.MaxConcurrency, c.cfg.MaxConcurrency) + assert.InDelta(t, def.DecreaseRatio, c.cfg.DecreaseRatio, 1e-9) + assert.InDelta(t, def.EwmaAlpha, c.cfg.EwmaAlpha, 1e-9) + assert.InDelta(t, def.DeviationScale, c.cfg.DeviationScale, 1e-9) + + // Also ensure InitialLimit <= MaxConcurrency + assert.LessOrEqual(t, c.cfg.InitialLimit, c.cfg.MaxConcurrency) + c.Shutdown() +} + +func TestController_AcquireRelease(t *testing.T) { + cfg := DefaultConfig() + cfg.Enabled = true + cfg.InitialLimit = 2 + c := newTestController(t, cfg) + require.NotNil(t, c) + + // Acquire two permits + assert.True(t, c.Acquire(context.Background())) + assert.Equal(t, 1, c.PermitsInUse()) + assert.True(t, c.Acquire(context.Background())) + assert.Equal(t, 2, c.PermitsInUse()) + + // Third acquire should block and fail with timeout + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + assert.False(t, c.Acquire(ctx)) + assert.Equal(t, 2, c.PermitsInUse()) + + // Release one and acquire again + c.Release() + assert.Equal(t, 1, c.PermitsInUse()) + assert.True(t, c.Acquire(context.Background())) + assert.Equal(t, 2, c.PermitsInUse()) + + c.Shutdown() +} + +func TestController_AcquireRespectsContextCancel(t *testing.T) { + cfg := DefaultConfig() + cfg.Enabled = true + cfg.InitialLimit = 1 + + c := newTestController(t, cfg) + require.NotNil(t, c) + + // Take the only permit + assert.True(t, c.Acquire(context.Background())) + + // Second acquire should obey context cancellation + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + ok := c.Acquire(ctx) + assert.False(t, ok) + + // Cleanup + c.Release() + c.Shutdown() +} + +func TestController_StartRequestCreditsOnlyOnSaturation(t *testing.T) { + cfg := DefaultConfig() + cfg.Enabled = true + cfg.InitialLimit = 2 + + c := newTestController(t, cfg) + + // Force a short period so control steps run frequently in other tests, + // but here we're just testing credit accrual behavior. + c.mu.Lock() + c.st.periodDur = 10 * time.Millisecond + c.mu.Unlock() + + // At start, no in-flight and no credits. + assert.Equal(t, 0, c.st.inFlight) + assert.Equal(t, 0, c.st.credits) + + // First request (inFlight=1 < limit=2) -> no credit. + c.StartRequest() + assert.Equal(t, 1, c.st.inFlight) + assert.Equal(t, 0, c.st.credits) + + // Second request (inFlight=2 == limit=2) -> credit++. + c.StartRequest() + assert.Equal(t, 2, c.st.inFlight) + assert.Equal(t, 1, c.st.credits) + + // Third request (inFlight=3 > limit=2) -> credit++. + c.StartRequest() + assert.Equal(t, 3, c.st.inFlight) + assert.Equal(t, 2, c.st.credits) + + // Release all three via ReleaseWithSample which also calls Release(). + c.ReleaseWithSample(context.Background(), 10*time.Millisecond, true, false) + c.ReleaseWithSample(context.Background(), 10*time.Millisecond, true, false) + c.ReleaseWithSample(context.Background(), 10*time.Millisecond, true, false) + assert.Equal(t, 0, c.st.inFlight) + + c.Shutdown() +} + +func TestController_Feedback_UpdatesEWMAOnlyOnSuccess(t *testing.T) { + cfg := DefaultConfig() + cfg.Enabled = true + cfg.InitialLimit = 1 + cfg.EwmaAlpha = 0.5 + + c := newTestController(t, cfg) + + // First successful sample initializes the robust EWMA + c.StartRequest() + c.ReleaseWithSample(context.Background(), 100*time.Millisecond, true, false) + require.True(t, c.st.reMean.initialized()) + mean1 := c.st.lastRTTMean + dev1 := c.st.lastRTTDev + + // Failed/backpressure=false sample should NOT update EWMA + c.StartRequest() + c.ReleaseWithSample(context.Background(), 400*time.Millisecond, false, false) + assert.InDelta(t, mean1, c.st.lastRTTMean, 1e-9) + assert.InDelta(t, dev1, c.st.lastRTTDev, 1e-9) + // Successful sample should update EWMA + c.StartRequest() + c.ReleaseWithSample(context.Background(), 200*time.Millisecond, true, false) + assert.Greater(t, math.Abs(mean1-c.st.lastRTTMean), 1e-9) + + c.Shutdown() +} + +func TestController_AdditiveIncreaseAndCreditReset(t *testing.T) { + cfg := DefaultConfig() + cfg.Enabled = true + cfg.InitialLimit = 1 + cfg.MaxConcurrency = 5 + cfg.EwmaAlpha = 0.5 + cfg.DeviationScale = 2.0 + + c := newTestController(t, cfg) + // Short test period + c.mu.Lock() + c.st.periodDur = 10 * time.Millisecond + c.mu.Unlock() + + // Prime EWMA with a stable RTT (so no spike) + c.StartRequest() + c.ReleaseWithSample(context.Background(), 100*time.Millisecond, true, false) + + // Force a control step to set prevRTTMean + forceControlStep(c) + assert.Equal(t, 1, c.CurrentLimit()) // Should not have increased + assert.Positive(t, c.st.prevRTTMean) + + // Build up enough credits to trigger an increase. + reqCredits := requiredCredits(c.CurrentLimit()) + for i := 0; i < reqCredits; i++ { + c.StartRequest() + } + // Release these; we want the control step to see saturation and no pressure. + for i := 0; i < reqCredits; i++ { + c.ReleaseWithSample(context.Background(), 100*time.Millisecond, true, false) + } + + // Wait past the period and poke again to trigger controlStep + forceControlStep(c) + + assert.Equal(t, 2, c.CurrentLimit(), "limit should increase additively by 1") + assert.Equal(t, 0, c.st.credits, "credits reset after increase") + + c.Shutdown() +} + +func TestController_MultiplicativeDecreaseOnBackpressure(t *testing.T) { + cfg := DefaultConfig() + cfg.Enabled = true + cfg.InitialLimit = 4 + cfg.MaxConcurrency = 10 + cfg.DecreaseRatio = 0.5 // make the effect obvious + + c := newTestController(t, cfg) + require.Equal(t, 4, c.CurrentLimit()) + + // Short control period + c.mu.Lock() + c.st.periodDur = 10 * time.Millisecond + c.mu.Unlock() + + // Cause an explicit backpressure signal + c.StartRequest() + c.ReleaseWithSample(context.Background(), 50*time.Millisecond, true, true) + + // Wait and trigger the control step + forceControlStep(c) + + // newLimit = floor(4 * 0.5) = 2 + assert.Equal(t, 2, c.CurrentLimit()) + assert.Equal(t, 0, c.st.credits) + + c.Shutdown() +} + +func TestController_DecreaseOnRTTSpike(t *testing.T) { + cfg := DefaultConfig() + cfg.Enabled = true + cfg.InitialLimit = 4 + cfg.DeviationScale = 1.0 + cfg.EwmaAlpha = 0.5 + + c := newTestController(t, cfg) + + // Short control period + c.mu.Lock() + c.st.periodDur = 10 * time.Millisecond + c.mu.Unlock() + + // Establish baseline RTT ~100ms + c.StartRequest() + c.ReleaseWithSample(context.Background(), 100*time.Millisecond, true, false) + + // Force a control step to set prevRTTMean + forceControlStep(c) + assert.Equal(t, 4, c.CurrentLimit()) + assert.Positive(t, c.st.prevRTTMean) + + // Trigger a spike sample + c.StartRequest() + c.ReleaseWithSample(context.Background(), 600*time.Millisecond, true, false) + + // After control step, limit should drop by DecreaseRatio (default 0.9) + forceControlStep(c) + + // Expect a decrease by floor(4*0.9) = 3 (one step down) + assert.Equal(t, 3, c.CurrentLimit()) + c.Shutdown() +} + +func TestController_MinFloorAndMaxCap(t *testing.T) { + cfg := DefaultConfig() + cfg.Enabled = true + cfg.InitialLimit = 1 + cfg.MaxConcurrency = 2 + cfg.DecreaseRatio = 0.1 + + c := newTestController(t, cfg) + + // Try to decrease at the floor; should stay at 1 + c.mu.Lock() + c.st.periodDur = 10 * time.Millisecond + c.mu.Unlock() + + c.StartRequest() + c.ReleaseWithSample(context.Background(), 50*time.Millisecond, true, true) // pressure + forceControlStep(c) + assert.Equal(t, 1, c.CurrentLimit(), "should not go below 1") + + // Build credits to increase to the cap = 2 + reqCredits := requiredCredits(c.CurrentLimit()) + for i := 0; i < reqCredits; i++ { + c.StartRequest() + } + for i := 0; i < reqCredits; i++ { + c.ReleaseWithSample(context.Background(), 50*time.Millisecond, true, false) + } + forceControlStep(c) + assert.Equal(t, 2, c.CurrentLimit(), "should increase to cap") + + // Further credit should not exceed MaxConcurrency + reqCredits = requiredCredits(c.CurrentLimit()) + for i := 0; i < reqCredits; i++ { + c.StartRequest() + } + for i := 0; i < reqCredits; i++ { + c.ReleaseWithSample(context.Background(), 50*time.Millisecond, true, false) + } + forceControlStep(c) + assert.Equal(t, 2, c.CurrentLimit(), "should not exceed cap") + + c.Shutdown() +} + +func TestController_ReleaseWithSampleReleases(t *testing.T) { + cfg := DefaultConfig() + cfg.Enabled = true + cfg.InitialLimit = 1 + + c := newTestController(t, cfg) + + // Acquire via the pool to bump inUse + assert.True(t, c.Acquire(context.Background())) + assert.Equal(t, 1, c.PermitsInUse()) + + // StartRequest increments inFlight; ReleaseWithSample must also release the pool permit + c.StartRequest() + c.ReleaseWithSample(context.Background(), 10*time.Millisecond, true, false) + + assert.Equal(t, 0, c.PermitsInUse()) + c.Shutdown() +} + +func Test_requiredCredits(t *testing.T) { + assert.Equal(t, 1, requiredCredits(0)) + assert.Equal(t, 1, requiredCredits(1)) + assert.Equal(t, 2, requiredCredits(2)) + assert.Equal(t, 2, requiredCredits(7)) + assert.Equal(t, 3, requiredCredits(8)) + assert.Equal(t, 3, requiredCredits(31)) + assert.Equal(t, 4, requiredCredits(32)) + assert.Equal(t, 4, requiredCredits(100)) +} + +func Test_contextOrBG(t *testing.T) { + var nilCtx context.Context // avoid SA1012 by not passing a literal nil + bg := contextOrBG(nilCtx) + require.NotNil(t, bg) + + ctx := context.Background() + require.Equal(t, ctx, contextOrBG(ctx)) +} diff --git a/exporter/exporterhelper/internal/arc/ewma.go b/exporter/exporterhelper/internal/arc/ewma.go new file mode 100644 index 00000000000..35b11e30aac --- /dev/null +++ b/exporter/exporterhelper/internal/arc/ewma.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package arc // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/arc" + +import "math" // robustEWMA tracks an exponentially-weighted moving *mean* and a robust +// dispersion proxy: the EWMA of absolute residuals, i.e., E[ |x - mean_prev| ]. +// This behaves similarly to a mean + K*sigma threshold but is cheaper and avoids +// squaring; it is less sensitive to outliers than naive variance updates. + +type robustEWMA struct { + alpha float64 + init bool + mean float64 + absDev float64 // EWMA of absolute residuals relative to previous mean +} + +func newRobustEWMA(alpha float64) robustEWMA { + return robustEWMA{alpha: alpha} +} + +func (e *robustEWMA) initialized() bool { return e.init } + +// update returns the new mean and absDev after ingesting x. +func (e *robustEWMA) update(x float64) (float64, float64) { + if math.IsNaN(x) || math.IsInf(x, 0) { + return e.mean, e.absDev + } + + if !e.init { + e.mean = x + e.absDev = 0 + e.init = true + return e.mean, e.absDev + } + prev := e.mean + e.mean = e.alpha*x + (1-e.alpha)*e.mean + resid := x - prev + if resid < 0 { + resid = -resid + } + e.absDev = e.alpha*resid + (1-e.alpha)*e.absDev + return e.mean, e.absDev +} diff --git a/exporter/exporterhelper/internal/arc/ewma_test.go b/exporter/exporterhelper/internal/arc/ewma_test.go new file mode 100644 index 00000000000..98f25b8b693 --- /dev/null +++ b/exporter/exporterhelper/internal/arc/ewma_test.go @@ -0,0 +1,87 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package arc + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" +) + +const ( + testAlpha = 0.4 + testEpsilon = 1e-9 +) + +func TestRobustEWMA_Initial(t *testing.T) { + e := newRobustEWMA(testAlpha) + assert.False(t, e.initialized()) + assert.InDelta(t, testAlpha, e.alpha, testEpsilon) + + // First update initializes + mean, absDev := e.update(10) + assert.True(t, e.initialized()) + assert.InDelta(t, 10.0, mean, testEpsilon) + assert.InDelta(t, 0.0, absDev, testEpsilon) + assert.InDelta(t, 10.0, e.mean, testEpsilon) + assert.InDelta(t, 0.0, e.absDev, testEpsilon) +} + +func TestRobustEWMA_Updates(t *testing.T) { + e := newRobustEWMA(testAlpha) + + // Init + e.update(10) + + // Second update + // mean = 0.4 * 20 + 0.6 * 10 = 8 + 6 = 14 + // resid = 20 - 10 = 10 + // absDev = 0.4 * 10 + 0.6 * 0 = 4 + mean, absDev := e.update(20) + assert.InDelta(t, 14.0, mean, testEpsilon) + assert.InDelta(t, 4.0, absDev, testEpsilon) + assert.InDelta(t, 14.0, e.mean, testEpsilon) + assert.InDelta(t, 4.0, e.absDev, testEpsilon) + + // Third update + // prev = 14 + // mean = 0.4 * 12 + 0.6 * 14 = 4.8 + 8.4 = 13.2 + // resid = 12 - 14 = -2 -> 2 (abs) + // absDev = 0.4 * 2 + 0.6 * 4 = 0.8 + 2.4 = 3.2 + mean, absDev = e.update(12) + assert.InDelta(t, 13.2, mean, testEpsilon) + assert.InDelta(t, 3.2, absDev, testEpsilon) + assert.InDelta(t, 13.2, e.mean, testEpsilon) + assert.InDelta(t, 3.2, e.absDev, testEpsilon) + + // Fourth update (check negative residual logic) + // prev = 13.2 + // mean = 0.4 * 30 + 0.6 * 13.2 = 12 + 7.92 = 19.92 + // resid = 30 - 13.2 = 16.8 + // absDev = 0.4 * 16.8 + 0.6 * 3.2 = 6.72 + 1.92 = 8.64 + mean, absDev = e.update(30) + assert.InDelta(t, 19.92, mean, testEpsilon) + assert.InDelta(t, 8.64, absDev, testEpsilon) + assert.InDelta(t, 19.92, e.mean, testEpsilon) + assert.InDelta(t, 8.64, e.absDev, testEpsilon) +} + +func TestRobustEWMA_NaN(t *testing.T) { + e := newRobustEWMA(testAlpha) + e.update(10) + e.update(20) + + // Update with NaN + e.update(math.NaN()) + + // Should not propagate NaN + assert.False(t, math.IsNaN(e.mean)) + assert.False(t, math.IsNaN(e.absDev)) + + // Should not propagate Inf + e.update(math.Inf(1)) + assert.False(t, math.IsInf(e.mean, 1)) + assert.False(t, math.IsInf(e.absDev, 1)) +} diff --git a/exporter/exporterhelper/internal/arc/params.go b/exporter/exporterhelper/internal/arc/params.go new file mode 100644 index 00000000000..c69d38bf2f9 --- /dev/null +++ b/exporter/exporterhelper/internal/arc/params.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package arc // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/arc" + +import ( + "time" +) + +// Config exposes the knobs required by exporterhelper queue integration. +// Ranges (validated by NewController): +// - InitialLimit >= 1 +// - MaxConcurrency >= InitialLimit +// - DecreaseRatio in (0, 1) +// - EwmaAlpha in (0, 1) +// - DeviationScale >= 0 +// +// ControlPeriod bounds (derived): [minPeriod, maxPeriod] where +// minPeriod = 50ms, maxPeriod = 2s. +// The target period is derived from the robust EWMA mean but clamped to +// these bounds to avoid pathological long/short windows. + +type Config struct { + Enabled bool `mapstructure:"enabled"` + InitialLimit int `mapstructure:"initial_concurrency"` + MaxConcurrency int `mapstructure:"max_concurrency"` + DecreaseRatio float64 `mapstructure:"decrease_ratio"` + EwmaAlpha float64 `mapstructure:"ewma_alpha"` + DeviationScale float64 `mapstructure:"rtt_deviation_scale"` +} + +func DefaultConfig() Config { + return Config{ + Enabled: false, + InitialLimit: 1, + MaxConcurrency: 200, + DecreaseRatio: 0.9, + EwmaAlpha: 0.4, + DeviationScale: 2.5, + } +} + +// control period clamps +const ( + minPeriod = 50 * time.Millisecond + maxPeriod = 2 * time.Second +) diff --git a/exporter/exporterhelper/internal/arc/token_pool.go b/exporter/exporterhelper/internal/arc/token_pool.go new file mode 100644 index 00000000000..c8e78d1ba90 --- /dev/null +++ b/exporter/exporterhelper/internal/arc/token_pool.go @@ -0,0 +1,117 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package arc // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/arc" + +import ( + "context" + "errors" + "sync" +) + +// TokenPool is a fair gate for request concurrency: +// - cap: maximum in-flight +// - inUse: current in-flight +// Acquire blocks while inUse >= cap. Shrinking is done by reducing cap; no +// "debt" bookkeeping or forgetting permits. This deliberately differs from a +// shrinkable semaphore that accrues forgets. + +type TokenPool struct { + mu sync.Mutex + cond *sync.Cond + cap int + inUse int + dead bool +} + +func newTokenPool(initial int) *TokenPool { + p := &TokenPool{cap: initial} + p.cond = sync.NewCond(&p.mu) + return p +} + +func (p *TokenPool) Close() { + p.mu.Lock() + defer p.mu.Unlock() + p.dead = true + p.cond.Broadcast() +} + +func (p *TokenPool) Acquire(ctx context.Context) error { + // Fast fail if already canceled. + if ctx != nil && ctx.Err() != nil { + return ctx.Err() + } + + p.mu.Lock() + defer p.mu.Unlock() + + if p.dead { + return errors.New("token pool closed") + } + + // Wake on context cancellation; ensure goroutine exits via close(done). + done := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + p.cond.Broadcast() + case <-done: + } + }() + defer close(done) + + for !p.dead && p.inUse >= p.cap { + p.cond.Wait() + + // Re-check cancellation and pool state after each wake. + if ctx != nil && ctx.Err() != nil { + return ctx.Err() + } + if p.dead { + return errors.New("token pool closed") + } + } + + if p.dead { + return errors.New("token pool closed") + } + if ctx != nil && ctx.Err() != nil { + return ctx.Err() + } + + p.inUse++ + return nil +} + +func (p *TokenPool) Release() { + p.mu.Lock() + p.inUse-- + if p.inUse < 0 { + p.inUse = 0 + } + p.mu.Unlock() + p.cond.Signal() +} + +func (p *TokenPool) Grow(n int) { + if n <= 0 { + return + } + p.mu.Lock() + p.cap += n + p.mu.Unlock() + p.cond.Broadcast() +} + +func (p *TokenPool) Shrink(n int) { + if n <= 0 { + return + } + p.mu.Lock() + p.cap -= n + if p.cap < 1 { + p.cap = 1 + } + p.mu.Unlock() +} diff --git a/exporter/exporterhelper/internal/arc/token_pool_test.go b/exporter/exporterhelper/internal/arc/token_pool_test.go new file mode 100644 index 00000000000..b7b0e0e3d53 --- /dev/null +++ b/exporter/exporterhelper/internal/arc/token_pool_test.go @@ -0,0 +1,193 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package arc + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTokenPool_New(t *testing.T) { + p := newTokenPool(10) + assert.Equal(t, 10, p.cap) + assert.Equal(t, 0, p.inUse) + assert.NotNil(t, p.cond) + assert.False(t, p.dead) +} + +func TestTokenPool_AcquireRelease(t *testing.T) { + p := newTokenPool(2) + + // Acquire first permit + err := p.Acquire(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, p.inUse) + + // Acquire second permit + err = p.Acquire(context.Background()) + require.NoError(t, err) + assert.Equal(t, 2, p.inUse) + + // Release first permit + p.Release() + assert.Equal(t, 1, p.inUse) + + // Acquire again + err = p.Acquire(context.Background()) + require.NoError(t, err) + assert.Equal(t, 2, p.inUse) + + // Release both + p.Release() + p.Release() + assert.Equal(t, 0, p.inUse) +} + +func TestTokenPool_Release_NoOverRelease(t *testing.T) { + p := newTokenPool(1) + p.Release() + assert.Equal(t, 0, p.inUse) // Should not go negative +} + +func TestTokenPool_Blocking(t *testing.T) { + p := newTokenPool(1) + + // Acquire the only permit + err := p.Acquire(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, p.inUse) + + // This next acquire should block + blockerDone := make(chan struct{}) + go func() { + err := p.Acquire(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 1, p.inUse) + p.Release() + close(blockerDone) + }() + + // Give goroutine time to block + time.Sleep(20 * time.Millisecond) + assert.Equal(t, 1, p.inUse) + + // Release the permit + p.Release() + + // The goroutine should now unblock + select { + case <-blockerDone: + // success + case <-time.After(100 * time.Millisecond): + t.Fatal("goroutine did not unblock after release") + } + + assert.Equal(t, 0, p.inUse) +} + +func TestTokenPool_Close(t *testing.T) { + p := newTokenPool(1) + + // Acquire the only permit + err := p.Acquire(context.Background()) + require.NoError(t, err) + + // This next acquire should block + blockerDone := make(chan error, 1) + go func() { + blockerDone <- p.Acquire(context.Background()) + }() + + // Give goroutine time to block + time.Sleep(20 * time.Millisecond) + + // Close the pool + p.Close() + assert.True(t, p.dead) + + // The goroutine should unblock with an error + select { + case closeErr := <-blockerDone: // Renamed 'err' to 'closeErr' to avoid shadow + require.Error(t, closeErr) + assert.Contains(t, closeErr.Error(), "token pool closed") + case <-time.After(100 * time.Millisecond): + t.Fatal("goroutine did not unblock after Close") + } + + // New acquires should also fail + err = p.Acquire(context.Background()) + require.Error(t, err) + assert.Contains(t, err.Error(), "token pool closed") +} + +func TestTokenPool_Grow(t *testing.T) { + p := newTokenPool(1) + + // Acquire the only permit + err := p.Acquire(context.Background()) + require.NoError(t, err) + + // This next acquire should block + blockerDone := make(chan struct{}) + go func() { + err := p.Acquire(context.Background()) + assert.NoError(t, err) + close(blockerDone) + }() + + // Give goroutine time to block + time.Sleep(20 * time.Millisecond) + + // Grow the pool + p.Grow(1) + assert.Equal(t, 2, p.cap) + + // The goroutine should unblock + select { + case <-blockerDone: + // success + case <-time.After(100 * time.Millisecond): + t.Fatal("goroutine did not unblock after Grow") + } + + assert.Equal(t, 2, p.inUse) +} + +func TestTokenPool_Acquire_ContextCanceled(t *testing.T) { + p := newTokenPool(1) + + // Acquire the only permit + err := p.Acquire(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, p.inUse) + + // This next acquire should block + ctx, cancel := context.WithCancel(context.Background()) + blockerDone := make(chan error, 1) + go func() { + blockerDone <- p.Acquire(ctx) + }() + + // Give goroutine time to block + time.Sleep(20 * time.Millisecond) + + // Cancel the context + cancel() + + // The goroutine should unblock with an error + select { + case acqErr := <-blockerDone: // This line fixes the shadow error + require.Error(t, acqErr) + require.ErrorIs(t, acqErr, context.Canceled) + case <-time.After(100 * time.Millisecond): + t.Fatal("goroutine did not unblock after context cancel") + } + + // The permit was not acquired + assert.Equal(t, 1, p.inUse) +} diff --git a/exporter/exporterhelper/internal/experr/retryable_error.go b/exporter/exporterhelper/internal/experr/retryable_error.go new file mode 100644 index 00000000000..3d960bb4da6 --- /dev/null +++ b/exporter/exporterhelper/internal/experr/retryable_error.go @@ -0,0 +1,136 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package experr + +import ( + "context" + "errors" + "net" + "net/http" + "regexp" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// Reason is emitted to logs/metrics when desired. +// You don't have to use it in telemetry if your metadata builder +// doesn't have a "reason" attribute yet. +type Reason string + +const ( + ReasonGRPCResourceExhausted Reason = "grpc_resource_exhausted" + ReasonGRPCUnavailable Reason = "grpc_unavailable" + ReasonGRPCDeadline Reason = "grpc_deadline_exceeded" + ReasonDeadline Reason = "deadline_exceeded" + ReasonHTTP429 Reason = "http_429" + ReasonHTTP503 Reason = "http_503" + ReasonHTTP504 Reason = "http_504" + ReasonHTTPRetryAfter Reason = "http_retry_after" + ReasonNetTimeout Reason = "net_timeout" + ReasonTextCircuitBreaker Reason = "text:circuit_breaker" + ReasonTextOverload Reason = "text:overload" + ReasonTextBackpressure Reason = "text:backpressure" +) + +var fallbackRegex = regexp.MustCompile( + `(?i)(429|too many requests)|` + // ReasonHTTP429 + `(503|service unavailable)|` + // ReasonHTTP503 + `(504|gateway timeout|upstream timeout)|` + // ReasonHTTP504 + `(retry-after)|` + // ReasonHTTPRetryAfter + `(circuit breaker)|` + // ReasonTextCircuitBreaker + `(overload)|` + // ReasonTextOverload + `(backpressure)`, // ReasonTextBackpressure +) + +// ClassifyErrorReason returns (isRetryable, reason). +// The logic is strictly: +// 1. Typed signals first (fast, no allocs) +// 2. Fallback to a single regex match on the error string +// +// NOTE: This function runs only on error paths. Even then, +// typed checks return early. Fallback string scanning is a +// micro-cost compared to network I/O and retry backoff. +func ClassifyErrorReason(err error) (bool, Reason) { + if err == nil { + return false, "" + } + + // 1) Context deadline/timeout + if errors.Is(err, context.DeadlineExceeded) { + return true, ReasonDeadline + } + + // 2) net.Error timeouts + var nerr net.Error + if errors.As(err, &nerr) && nerr.Timeout() { + return true, ReasonNetTimeout + } + + // 3) gRPC statuses (common in OTLP/gRPC) + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.ResourceExhausted: + return true, ReasonGRPCResourceExhausted + case codes.Unavailable: + return true, ReasonGRPCUnavailable + case codes.DeadlineExceeded: + return true, ReasonGRPCDeadline + } + } + + // 4) Wrapped HTTP response providers (if your error wraps response) + // Define a minimal interface to avoid importing your transport layer. + type respCarrier interface{ Response() *http.Response } + var rc respCarrier + if errors.As(err, &rc) { + if r := rc.Response(); r != nil { + switch r.StatusCode { + case http.StatusTooManyRequests: + return true, ReasonHTTP429 + case http.StatusServiceUnavailable: + return true, ReasonHTTP503 + case http.StatusGatewayTimeout: + return true, ReasonHTTP504 + } + if r.Header.Get("Retry-After") != "" { + return true, ReasonHTTPRetryAfter + } + } + } + + // 5) Fallback: cheap regex match on error string. + matches := fallbackRegex.FindStringSubmatch(err.Error()) + if matches == nil { + return false, "" + } + + // The regex has 7 capture groups, one for each reason. + // FindStringSubmatch returns the full match at [0], then groups at [1]...[n]. + // We check which group captured. + switch { + case matches[1] != "": + return true, ReasonHTTP429 + case matches[2] != "": + return true, ReasonHTTP503 + case matches[3] != "": + return true, ReasonHTTP504 + case matches[4] != "": + return true, ReasonHTTPRetryAfter + case matches[5] != "": + return true, ReasonTextCircuitBreaker + case matches[6] != "": + return true, ReasonTextOverload + case matches[7] != "": + return true, ReasonTextBackpressure + } + + return false, "" +} + +// IsRetryableError is a convenience wrapper. +func IsRetryableError(err error) bool { + ok, _ := ClassifyErrorReason(err) + return ok +} diff --git a/exporter/exporterhelper/internal/experr/retryable_error_test.go b/exporter/exporterhelper/internal/experr/retryable_error_test.go new file mode 100644 index 00000000000..f933efe17a4 --- /dev/null +++ b/exporter/exporterhelper/internal/experr/retryable_error_test.go @@ -0,0 +1,156 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package experr // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr" + +import ( + "context" + "errors" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// ----- helpers / fakes ----- + +// netTimeoutErr implements net.Error and returns Timeout()==true. +type netTimeoutErr struct{} + +func (netTimeoutErr) Error() string { return "i/o timeout" } +func (netTimeoutErr) Timeout() bool { return true } +func (netTimeoutErr) Temporary() bool { return false } // kept for older interfaces + +// httpRespErr is an error that carries an *http.Response and satisfies the +// minimal interface the classifier uses via errors.As. +type httpRespErr struct { + resp *http.Response + msg string +} + +func (e httpRespErr) Error() string { return e.msg } +func (e httpRespErr) Response() *http.Response { return e.resp } + +// lower builds a simple error with the provided message (for fallback checks). +func lower(msg string) error { return errors.New(strings.ToLower(msg)) } + +// ----- tests ----- + +func TestClassifyErrorReason_TypedContextDeadline(t *testing.T) { + ok, reason := ClassifyErrorReason(context.DeadlineExceeded) + assert.True(t, ok) + assert.Equal(t, ReasonDeadline, reason) +} + +func TestClassifyErrorReason_TypedNetTimeout(t *testing.T) { + ok, reason := ClassifyErrorReason(netTimeoutErr{}) + assert.True(t, ok) + assert.Equal(t, ReasonNetTimeout, reason) +} + +func TestClassifyErrorReason_TypedGRPC_StatusCodes(t *testing.T) { + cases := []struct { + err error + reason Reason + }{ + {status.Error(codes.ResourceExhausted, "overloaded"), ReasonGRPCResourceExhausted}, + {status.Error(codes.Unavailable, "unavailable"), ReasonGRPCUnavailable}, + {status.Error(codes.DeadlineExceeded, "deadline"), ReasonGRPCDeadline}, + } + + for _, tc := range cases { + ok, reason := ClassifyErrorReason(tc.err) + assert.True(t, ok, tc.err) + assert.Equal(t, tc.reason, reason) + } +} + +func TestClassifyErrorReason_TypedHTTP_StatusCodes(t *testing.T) { + cases := []struct { + code int + header http.Header + reason Reason + }{ + {http.StatusTooManyRequests, nil, ReasonHTTP429}, + {http.StatusServiceUnavailable, nil, ReasonHTTP503}, + {http.StatusGatewayTimeout, nil, ReasonHTTP504}, + {http.StatusOK, http.Header{"Retry-After": []string{"10"}}, ReasonHTTPRetryAfter}, + } + + for _, tc := range cases { + resp := &http.Response{StatusCode: tc.code, Header: tc.header} + err := httpRespErr{resp: resp, msg: "transport error"} + ok, reason := ClassifyErrorReason(err) + assert.True(t, ok, tc) + assert.Equal(t, tc.reason, reason, tc) + } +} + +func TestClassifyErrorReason_FallbackRegex_Positive(t *testing.T) { + // These exercise the regex fallback path. + cases := []struct { + err error + reason Reason + }{ + {errors.New("HTTP 429 Too Many Requests"), ReasonHTTP429}, + {lower("too many requests"), ReasonHTTP429}, + {errors.New("some 429 error"), ReasonHTTP429}, + + {errors.New("received HTTP 503 Service Unavailable"), ReasonHTTP503}, + {lower("service unavailable"), ReasonHTTP503}, + {errors.New("503 blah"), ReasonHTTP503}, + + {errors.New("HTTP 504 Gateway Timeout"), ReasonHTTP504}, + {lower("gateway timeout"), ReasonHTTP504}, + {lower("upstream timeout"), ReasonHTTP504}, + {errors.New("got a 504"), ReasonHTTP504}, + + {lower("retry-after header present"), ReasonHTTPRetryAfter}, + {errors.New("Retry-After: 30"), ReasonHTTPRetryAfter}, + + {lower("circuit breaker open"), ReasonTextCircuitBreaker}, + {errors.New("Circuit Breaker"), ReasonTextCircuitBreaker}, + + {lower("system overload"), ReasonTextOverload}, + {errors.New("OVERLOAD"), ReasonTextOverload}, + + {lower("backpressure detected"), ReasonTextBackpressure}, + {errors.New("BACKPRESSURE"), ReasonTextBackpressure}, + } + + for _, tc := range cases { + ok, reason := ClassifyErrorReason(tc.err) + assert.True(t, ok, tc.err) + assert.Equal(t, tc.reason, reason, tc.err) + } +} + +func TestIsRetryableError_Positive_Smoke(t *testing.T) { + // A quick smoke test through the convenience wrapper. + assert.True(t, IsRetryableError(status.Error(codes.ResourceExhausted, "x"))) + assert.True(t, IsRetryableError(netTimeoutErr{})) + assert.True(t, IsRetryableError(context.DeadlineExceeded)) + assert.True(t, IsRetryableError(errors.New("429"))) + assert.True(t, IsRetryableError(errors.New("service unavailable"))) + assert.True(t, IsRetryableError(errors.New("gateway timeout"))) + assert.True(t, IsRetryableError(errors.New("OVERLOAD"))) +} + +func TestIsRetryableError_NegativeCases(t *testing.T) { + cases := []error{ + nil, + errors.New("some transient error"), + errors.New("internal server error"), // intentionally not treated as retryable + errors.New("permission denied"), // not retryable + errors.New("bad gateway (502)"), // not in our fallback allowlist + errors.New("request timeout (408)"), // not in our fallback allowlist + errors.New("proxy timeout (599)"), // not in our fallback allowlist + } + + for _, err := range cases { + assert.False(t, IsRetryableError(err), "expected not retryable: %v", err) + } +} diff --git a/exporter/exporterhelper/internal/metadata/generated_telemetry.go b/exporter/exporterhelper/internal/metadata/generated_telemetry.go index 510cb29002d..37b7e022b81 100644 --- a/exporter/exporterhelper/internal/metadata/generated_telemetry.go +++ b/exporter/exporterhelper/internal/metadata/generated_telemetry.go @@ -28,6 +28,13 @@ type TelemetryBuilder struct { meter metric.Meter mu sync.Mutex registrations []metric.Registration + ExporterArcAcquireWaitMs metric.Int64Histogram + ExporterArcBackoffEvents metric.Int64Counter + ExporterArcFailures metric.Int64Counter + ExporterArcLimit metric.Int64ObservableGauge + ExporterArcLimitChanges metric.Int64Counter + ExporterArcPermitsInUse metric.Int64ObservableGauge + ExporterArcRttMs metric.Int64Histogram ExporterEnqueueFailedLogRecords metric.Int64Counter ExporterEnqueueFailedMetricPoints metric.Int64Counter ExporterEnqueueFailedSpans metric.Int64Counter @@ -54,6 +61,36 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { tbof(mb) } +// RegisterExporterArcLimitCallback sets callback for observable ExporterArcLimit metric. +func (builder *TelemetryBuilder) RegisterExporterArcLimitCallback(cb metric.Int64Callback) error { + reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { + cb(ctx, &observerInt64{inst: builder.ExporterArcLimit, obs: o}) + return nil + }, builder.ExporterArcLimit) + if err != nil { + return err + } + builder.mu.Lock() + defer builder.mu.Unlock() + builder.registrations = append(builder.registrations, reg) + return nil +} + +// RegisterExporterArcPermitsInUseCallback sets callback for observable ExporterArcPermitsInUse metric. +func (builder *TelemetryBuilder) RegisterExporterArcPermitsInUseCallback(cb metric.Int64Callback) error { + reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { + cb(ctx, &observerInt64{inst: builder.ExporterArcPermitsInUse, obs: o}) + return nil + }, builder.ExporterArcPermitsInUse) + if err != nil { + return err + } + builder.mu.Lock() + defer builder.mu.Unlock() + builder.registrations = append(builder.registrations, reg) + return nil +} + // RegisterExporterQueueCapacityCallback sets callback for observable ExporterQueueCapacity metric. func (builder *TelemetryBuilder) RegisterExporterQueueCapacityCallback(cb metric.Int64Callback) error { reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { @@ -112,6 +149,48 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme } builder.meter = Meter(settings) var err, errs error + builder.ExporterArcAcquireWaitMs, err = builder.meter.Int64Histogram( + "otelcol_exporter_arc_acquire_wait_ms", + metric.WithDescription("Time a worker waited to acquire an ARC permit. [Alpha]"), + metric.WithUnit("ms"), + ) + errs = errors.Join(errs, err) + builder.ExporterArcBackoffEvents, err = builder.meter.Int64Counter( + "otelcol_exporter_arc_backoff_events", + metric.WithDescription("Number of ARC backoff (shrink) events triggered by error or RTT signal. [Alpha]"), + metric.WithUnit("{events}"), + ) + errs = errors.Join(errs, err) + builder.ExporterArcFailures, err = builder.meter.Int64Counter( + "otelcol_exporter_arc_failures", + metric.WithDescription("Number of requests considered failures by ARC (feeds adaptive shrink). [Alpha]"), + metric.WithUnit("{requests}"), + ) + errs = errors.Join(errs, err) + builder.ExporterArcLimit, err = builder.meter.Int64ObservableGauge( + "otelcol_exporter_arc_limit", + metric.WithDescription("Current ARC dynamic concurrency limit. [Alpha]"), + metric.WithUnit("{permits}"), + ) + errs = errors.Join(errs, err) + builder.ExporterArcLimitChanges, err = builder.meter.Int64Counter( + "otelcol_exporter_arc_limit_changes", + metric.WithDescription("Number of times ARC changed its concurrency limit. [Alpha]"), + metric.WithUnit("{events}"), + ) + errs = errors.Join(errs, err) + builder.ExporterArcPermitsInUse, err = builder.meter.Int64ObservableGauge( + "otelcol_exporter_arc_permits_in_use", + metric.WithDescription("Number of permits currently acquired. [Alpha]"), + metric.WithUnit("{permits}"), + ) + errs = errors.Join(errs, err) + builder.ExporterArcRttMs, err = builder.meter.Int64Histogram( + "otelcol_exporter_arc_rtt_ms", + metric.WithDescription("Request round-trip-time measured by ARC (from permit acquire to release). [Alpha]"), + metric.WithUnit("ms"), + ) + errs = errors.Join(errs, err) builder.ExporterEnqueueFailedLogRecords, err = builder.meter.Int64Counter( "otelcol_exporter_enqueue_failed_log_records", metric.WithDescription("Number of log records failed to be added to the sending queue. [Alpha]"), diff --git a/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go b/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go index 7096bc471f7..1daff068bed 100644 --- a/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go +++ b/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go @@ -12,6 +12,112 @@ import ( "go.opentelemetry.io/collector/component/componenttest" ) +func AssertEqualExporterArcAcquireWaitMs(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.HistogramDataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_exporter_arc_acquire_wait_ms", + Description: "Time a worker waited to acquire an ARC permit. [Alpha]", + Unit: "ms", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_exporter_arc_acquire_wait_ms") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualExporterArcBackoffEvents(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_exporter_arc_backoff_events", + Description: "Number of ARC backoff (shrink) events triggered by error or RTT signal. [Alpha]", + Unit: "{events}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_exporter_arc_backoff_events") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualExporterArcFailures(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_exporter_arc_failures", + Description: "Number of requests considered failures by ARC (feeds adaptive shrink). [Alpha]", + Unit: "{requests}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_exporter_arc_failures") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualExporterArcLimit(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_exporter_arc_limit", + Description: "Current ARC dynamic concurrency limit. [Alpha]", + Unit: "{permits}", + Data: metricdata.Gauge[int64]{ + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_exporter_arc_limit") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualExporterArcLimitChanges(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_exporter_arc_limit_changes", + Description: "Number of times ARC changed its concurrency limit. [Alpha]", + Unit: "{events}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_exporter_arc_limit_changes") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualExporterArcPermitsInUse(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_exporter_arc_permits_in_use", + Description: "Number of permits currently acquired. [Alpha]", + Unit: "{permits}", + Data: metricdata.Gauge[int64]{ + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_exporter_arc_permits_in_use") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualExporterArcRttMs(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.HistogramDataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_exporter_arc_rtt_ms", + Description: "Request round-trip-time measured by ARC (from permit acquire to release). [Alpha]", + Unit: "ms", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_exporter_arc_rtt_ms") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + func AssertEqualExporterEnqueueFailedLogRecords(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_exporter_enqueue_failed_log_records", diff --git a/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go b/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go index 838a03f504a..1c4c163a384 100644 --- a/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go +++ b/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go @@ -20,6 +20,14 @@ func TestSetupTelemetry(t *testing.T) { tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings()) require.NoError(t, err) defer tb.Shutdown() + require.NoError(t, tb.RegisterExporterArcLimitCallback(func(_ context.Context, observer metric.Int64Observer) error { + observer.Observe(1) + return nil + })) + require.NoError(t, tb.RegisterExporterArcPermitsInUseCallback(func(_ context.Context, observer metric.Int64Observer) error { + observer.Observe(1) + return nil + })) require.NoError(t, tb.RegisterExporterQueueCapacityCallback(func(_ context.Context, observer metric.Int64Observer) error { observer.Observe(1) return nil @@ -28,6 +36,11 @@ func TestSetupTelemetry(t *testing.T) { observer.Observe(1) return nil })) + tb.ExporterArcAcquireWaitMs.Record(context.Background(), 1) + tb.ExporterArcBackoffEvents.Add(context.Background(), 1) + tb.ExporterArcFailures.Add(context.Background(), 1) + tb.ExporterArcLimitChanges.Add(context.Background(), 1) + tb.ExporterArcRttMs.Record(context.Background(), 1) tb.ExporterEnqueueFailedLogRecords.Add(context.Background(), 1) tb.ExporterEnqueueFailedMetricPoints.Add(context.Background(), 1) tb.ExporterEnqueueFailedSpans.Add(context.Background(), 1) @@ -39,6 +52,27 @@ func TestSetupTelemetry(t *testing.T) { tb.ExporterSentLogRecords.Add(context.Background(), 1) tb.ExporterSentMetricPoints.Add(context.Background(), 1) tb.ExporterSentSpans.Add(context.Background(), 1) + AssertEqualExporterArcAcquireWaitMs(t, testTel, + []metricdata.HistogramDataPoint[int64]{{}}, metricdatatest.IgnoreValue(), + metricdatatest.IgnoreTimestamp()) + AssertEqualExporterArcBackoffEvents(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualExporterArcFailures(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualExporterArcLimit(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualExporterArcLimitChanges(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualExporterArcPermitsInUse(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualExporterArcRttMs(t, testTel, + []metricdata.HistogramDataPoint[int64]{{}}, metricdatatest.IgnoreValue(), + metricdatatest.IgnoreTimestamp()) AssertEqualExporterEnqueueFailedLogRecords(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index c49c7265647..fc1886c9a39 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -5,11 +5,20 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe import ( "context" + "errors" + "fmt" + "strings" "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configoptional" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/arc" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender" @@ -32,6 +41,7 @@ func NewDefaultQueueConfig() queuebatch.Config { Sizer: request.SizerTypeItems, MinSize: 8192, }), + Arc: arc.DefaultConfig(), } } @@ -41,6 +51,7 @@ func NewQueueSender( exportFailureMessage string, next sender.Sender[request.Request], ) (sender.Sender[request.Request], error) { + var arcCtl *arc.Controller exportFunc := func(ctx context.Context, req request.Request) error { // Have to read the number of items before sending the request since the request can // be modified by the downstream components like the batcher. @@ -53,5 +64,88 @@ func NewQueueSender( return nil } - return queuebatch.NewQueueBatch(qSet, qCfg, exportFunc) + // If ARC is enabled, wrap the export function with ARC logic. + if qCfg.Arc.Enabled { + tel, err := metadata.NewTelemetryBuilder(qSet.Telemetry) + if err != nil { + return nil, fmt.Errorf("failed to create telemetry builder for ARC: %w", err) + } + arcCtl = arc.NewController(qCfg.Arc, tel, qSet.ID, qSet.Signal) + + // Create the attributes for sync ARC metrics + exporterAttr := attribute.String("exporter", qSet.ID.String()) + dataTypeAttr := attribute.String("data_type", strings.ToLower(qSet.Signal.String())) + arcAttrs := metric.WithAttributes(exporterAttr, dataTypeAttr) + + origExportFunc := exportFunc + exportFunc = func(ctx context.Context, req request.Request) error { + startWait := time.Now() + if !arcCtl.Acquire(ctx) { + // This context is from the queue, so it's a background context. + // If Acquire fails, it means context was cancelled, which shouldn't happen + // unless shutdown is happening. + + // Record wait time even on failure + waitMs := time.Since(startWait).Milliseconds() + tel.ExporterArcAcquireWaitMs.Record(ctx, waitMs, arcAttrs) + + // Record failure + tel.ExporterArcFailures.Add(ctx, 1, arcAttrs) + + if err := ctx.Err(); err != nil { + return err + } + + return experr.NewShutdownErr(errors.New("arc semaphore closed")) + } + waitMs := time.Since(startWait).Milliseconds() + tel.ExporterArcAcquireWaitMs.Record(ctx, waitMs, arcAttrs) + + arcCtl.StartRequest() + startTime := time.Now() + err := origExportFunc(ctx, req) + rtt := time.Since(startTime) + + isBackpressure := experr.IsRetryableError(err) + + isSuccess := err == nil + + arcCtl.ReleaseWithSample(ctx, rtt, isSuccess, isBackpressure) + return err + } + } + + qbs, err := queuebatch.NewQueueBatch(qSet, qCfg, exportFunc) + if err != nil { + return nil, err + } + + // If ARC is enabled, we need to chain its shutdown. + if arcCtl != nil { + return &queueSenderWithArc{ + QueueBatch: qbs, + arcCtl: arcCtl, + }, nil + } + + return qbs, nil +} + +// queueSenderWithArc combines the QueueBatch sender with an arc.Controller +// to ensure both are started and shut down correctly. +type queueSenderWithArc struct { + *queuebatch.QueueBatch + arcCtl *arc.Controller +} + +// Shutdown shuts down both the queue/batcher and the ARC controller. +func (qs *queueSenderWithArc) Shutdown(ctx context.Context) error { + qs.arcCtl.Shutdown() + return qs.QueueBatch.Shutdown(ctx) +} + +// Start starts both the queue/batcher and the ARC controller. +// Note: ARC controller doesn't have a Start, so this just passes through. +func (qs *queueSenderWithArc) Start(ctx context.Context, host component.Host) error { + return qs.QueueBatch.Start(ctx, host) } diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index ad6020b5a1f..34f88a50f77 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -6,15 +6,22 @@ package internal import ( "context" "errors" + "sync" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest" @@ -42,6 +49,90 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) { assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message) } +func TestQueueSender_ArcAcquireWaitMetric_Failure(t *testing.T) { + tt := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + qSet := queuebatch.AllSettings[request.Request]{ + Signal: pipeline.SignalMetrics, + ID: component.NewID(exportertest.NopType), + Telemetry: tt.NewTelemetrySettings(), + } + qCfg := NewDefaultQueueConfig() + qCfg.Arc.Enabled = true + qCfg.Arc.InitialLimit = 1 + qCfg.Batch = configoptional.Optional[queuebatch.BatchConfig]{} // This disables batching + qCfg.WaitForResult = true + + // Create a sender that blocks + blocker := make(chan struct{}) + mockSender := sender.NewSender(func(context.Context, request.Request) error { + <-blocker // Block the first request + return nil + }) + + be, err := NewQueueSender(qSet, qCfg, "", mockSender) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + // Send first request (will block in sender) + var sendWg sync.WaitGroup + sendWg.Add(1) + go func() { + defer sendWg.Done() + assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 1})) + }() + + // Wait for the first request to acquire the permit + time.Sleep(50 * time.Millisecond) + + // Send second request with a context that will be cancelled + ctx, cancel := context.WithCancel(context.Background()) + sendWg.Add(1) + go func() { + defer sendWg.Done() + // This send will block in arcCtl.Acquire() and then fail + sendErr := be.Send(ctx, &requesttest.FakeRequest{Items: 1}) + assert.Error(t, sendErr) + // The error should be context.Canceled, as the context passed to Send is canceled. + assert.ErrorIs(t, sendErr, context.Canceled) + }() + + // Wait for the second request to block in Acquire + time.Sleep(50 * time.Millisecond) + + // Cancel the context + cancel() + + // Unblock the first sender + close(blocker) + + // Wait for both sends to complete + sendWg.Wait() + + // Shutdown to ensure all telemetry is flushed + require.NoError(t, be.Shutdown(context.Background())) + + // 4. Verify the acquire wait metric + metrics, err := tt.GetMetric("otelcol_exporter_arc_acquire_wait_ms") + require.NoError(t, err) + points := metrics.Data.(metricdata.Histogram[int64]).DataPoints + require.Len(t, points, 1) + + // Both requests should have recorded a wait time (1 success, 1 failure) + assert.Equal(t, uint64(2), points[0].Count) + maxVal, ok := points[0].Max.Value() + require.True(t, ok, "Max value should be valid") + assert.Greater(t, maxVal, int64(40), "Max wait time (from failed acquire) should be significant") + + // Verify the arc failures metric + metrics, err = tt.GetMetric("otelcol_exporter_arc_failures") + require.NoError(t, err) + pointsF := metrics.Data.(metricdata.Sum[int64]).DataPoints + require.Len(t, pointsF, 1) + assert.Equal(t, int64(1), pointsF[0].Value) +} + func TestQueueConfig_Validate(t *testing.T) { qCfg := NewDefaultQueueConfig() require.NoError(t, qCfg.Validate()) @@ -57,3 +148,69 @@ func TestQueueConfig_Validate(t *testing.T) { qCfg.Enabled = false assert.NoError(t, qCfg.Validate()) } + +func TestArc_BackpressureTriggersShrinkAndMetrics(t *testing.T) { + tt := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + qSet := queuebatch.AllSettings[request.Request]{ + Signal: pipeline.SignalTraces, + ID: component.NewID(exportertest.NopType), + Telemetry: tt.NewTelemetrySettings(), + } + + qCfg := NewDefaultQueueConfig() + qCfg.WaitForResult = true + qCfg.Batch = configoptional.Optional[queuebatch.BatchConfig]{} // disable batching + qCfg.Arc.Enabled = true + qCfg.Arc.InitialLimit = 3 + qCfg.Arc.MaxConcurrency = 10 + + // First call: explicit backpressure (gRPC ResourceExhausted). Second call: success. + var call int32 + be, err := NewQueueSender( + qSet, + qCfg, + qSet.ID.String(), + sender.NewSender(func(context.Context, request.Request) error { + c := atomic.AddInt32(&call, 1) + if c == 1 { + // 1. Simulates a backpressure error + return status.Error(codes.ResourceExhausted, "overload") + } + return nil + }), + ) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + // First send (backpressure) + require.Error(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 1})) + // Ensure control step boundary is crossed (~300ms default period) + time.Sleep(350 * time.Millisecond) + // Second send (success) triggers control step and metrics flush + require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 1})) + require.NoError(t, be.Shutdown(context.Background())) + + // 2. Asserts otelcol_exporter_arc_backoff_events increments + m, err := tt.GetMetric("otelcol_exporter_arc_backoff_events") + require.NoError(t, err) + backoffPoints := m.Data.(metricdata.Sum[int64]).DataPoints + var backoffSum int64 + for _, dp := range backoffPoints { + backoffSum += dp.Value + } + assert.GreaterOrEqual(t, backoffSum, int64(1), "expected at least one backoff event") + + // 3. Asserts otelcol_exporter_arc_limit_changes{direction="down"} increments + m, err = tt.GetMetric("otelcol_exporter_arc_limit_changes") + require.NoError(t, err) + lcPoints := m.Data.(metricdata.Sum[int64]).DataPoints + var down int64 + for _, dp := range lcPoints { + if v, ok := dp.Attributes.Value("direction"); ok && v.AsString() == "down" { + down += dp.Value + } + } + assert.GreaterOrEqual(t, down, int64(1), "expected at least one limit-down change") +} diff --git a/exporter/exporterhelper/internal/queuebatch/config.go b/exporter/exporterhelper/internal/queuebatch/config.go index ecef3ce3040..fb1516273e0 100644 --- a/exporter/exporterhelper/internal/queuebatch/config.go +++ b/exporter/exporterhelper/internal/queuebatch/config.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/arc" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" ) @@ -46,6 +47,9 @@ type Config struct { // BatchConfig it configures how the requests are consumed from the queue and batch together during consumption. Batch configoptional.Optional[BatchConfig] `mapstructure:"batch"` + + // Arc enables the Adaptive Concurrency Limiter. + Arc arc.Config `mapstructure:"arc"` } func (cfg *Config) Unmarshal(conf *confmap.Conf) error { diff --git a/exporter/exporterhelper/metadata.yaml b/exporter/exporterhelper/metadata.yaml index baa606a08fc..5bfd8b0426b 100644 --- a/exporter/exporterhelper/metadata.yaml +++ b/exporter/exporterhelper/metadata.yaml @@ -11,8 +11,87 @@ status: stability: beta: [traces, metrics, logs] +attributes: + direction: + description: "up or down" + type: string + enum: ["up","down"] + telemetry: metrics: + exporter_arc_acquire_wait_ms: + enabled: true + stability: + level: alpha + description: Time a worker waited to acquire an ARC permit. + unit: "ms" + histogram: + value_type: int + explicit: + buckets: [0, 1, 2, 5, 10, 20, 50, 100, 250, 500, 1000, 2000, 5000] + + exporter_arc_backoff_events: + enabled: true + stability: + level: alpha + description: Number of ARC backoff (shrink) events triggered by error or RTT signal. + unit: "{events}" + sum: + value_type: int + monotonic: true + + exporter_arc_failures: + enabled: true + stability: + level: alpha + description: Number of requests considered failures by ARC (feeds adaptive shrink). + unit: "{requests}" + sum: + value_type: int + monotonic: true + + exporter_arc_limit: + enabled: true + stability: + level: alpha + description: Current ARC dynamic concurrency limit. + unit: "{permits}" + gauge: + value_type: int + async: true + + exporter_arc_limit_changes: + enabled: true + stability: + level: alpha + description: Number of times ARC changed its concurrency limit. + unit: "{events}" + sum: + value_type: int + monotonic: true + attributes: [direction] + + exporter_arc_permits_in_use: + enabled: true + stability: + level: alpha + description: Number of permits currently acquired. + unit: "{permits}" + gauge: + value_type: int + async: true + + exporter_arc_rtt_ms: + enabled: true + stability: + level: alpha + description: Request round-trip-time measured by ARC (from permit acquire to release). + unit: "ms" + histogram: + value_type: int + explicit: + buckets: [0, 5, 10, 20, 50, 100, 200, 400, 800, 1600, 3200, 6400] + exporter_enqueue_failed_log_records: enabled: true stability: @@ -181,4 +260,4 @@ telemetry: unit: "{spans}" sum: value_type: int - monotonic: true + monotonic: true \ No newline at end of file