Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .chloggen/13956.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: pkg/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add retry dropped item metrics and an exhausted retry error marker for exporter helper retries.

# One or more tracking issues or pull requests related to the change
issues: [13956]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |-
New counters `otelcol_exporter_retry_dropped_{spans,metric_points,log_records}` capture items discarded after exhausting retries while `IsRetriesExhaustedErr` detects the terminal retry outcome.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
24 changes: 24 additions & 0 deletions exporter/exporterhelper/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,30 @@ Current size of the retry queue (in batches). [alpha]
| ---- | ----------- | ---------- | --------- |
| {batches} | Gauge | Int | alpha |

### otelcol_exporter_retry_dropped_log_records

Number of log records dropped after exhausting configured retries. [alpha]

| Unit | Metric Type | Value Type | Monotonic | Stability |
| ---- | ----------- | ---------- | --------- | --------- |
| {records} | Sum | Int | true | alpha |

### otelcol_exporter_retry_dropped_metric_points

Number of metric points dropped after exhausting configured retries. [alpha]

| Unit | Metric Type | Value Type | Monotonic | Stability |
| ---- | ----------- | ---------- | --------- | --------- |
| {datapoints} | Sum | Int | true | alpha |

### otelcol_exporter_retry_dropped_spans

Number of spans dropped after exhausting configured retries. [alpha]

| Unit | Metric Type | Value Type | Monotonic | Stability |
| ---- | ----------- | ---------- | --------- | --------- |
| {spans} | Sum | Int | true | alpha |

### otelcol_exporter_send_failed_log_records

Number of log records in failed attempts to send to destination. [alpha]
Expand Down
25 changes: 22 additions & 3 deletions exporter/exporterhelper/internal/experr/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@

package experr // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"

import (
"errors"
)
import "errors"

type shutdownErr struct {
err error
Expand All @@ -27,3 +25,24 @@ func IsShutdownErr(err error) bool {
var sdErr shutdownErr
return errors.As(err, &sdErr)
}

type retriesExhaustedErr struct {
err error
}

func NewRetriesExhaustedErr(err error) error {
return retriesExhaustedErr{err: err}
}

func (r retriesExhaustedErr) Error() string {
return "retries exhausted: " + r.err.Error()
}

func (r retriesExhaustedErr) Unwrap() error {
return r.err
}

func IsRetriesExhaustedErr(err error) bool {
var reErr retriesExhaustedErr
return errors.As(err, &reErr)
}
12 changes: 12 additions & 0 deletions exporter/exporterhelper/internal/experr/err_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,15 @@ func TestIsShutdownErr(t *testing.T) {
err = NewShutdownErr(err)
require.True(t, IsShutdownErr(err))
}

func TestNewRetriesExhaustedErr(t *testing.T) {
err := NewRetriesExhaustedErr(errors.New("another error"))
assert.Equal(t, "retries exhausted: another error", err.Error())
}

func TestIsRetriesExhaustedErr(t *testing.T) {
err := errors.New("testError")
require.False(t, IsRetriesExhaustedErr(err))
err = NewRetriesExhaustedErr(err)
require.True(t, IsRetriesExhaustedErr(err))
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 16 additions & 7 deletions exporter/exporterhelper/internal/obs_report_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
Expand Down Expand Up @@ -40,13 +41,14 @@ type obsReportSender[K request.Request] struct {
component.StartFunc
component.ShutdownFunc

spanName string
tracer trace.Tracer
spanAttrs trace.SpanStartEventOption
metricAttr metric.MeasurementOption
itemsSentInst metric.Int64Counter
itemsFailedInst metric.Int64Counter
next sender.Sender[K]
spanName string
tracer trace.Tracer
spanAttrs trace.SpanStartEventOption
metricAttr metric.MeasurementOption
itemsSentInst metric.Int64Counter
itemsFailedInst metric.Int64Counter
itemsRetryDroppedInst metric.Int64Counter
next sender.Sender[K]
}

func newObsReportSender[K request.Request](set exporter.Settings, signal pipeline.Signal, next sender.Sender[K]) (sender.Sender[K], error) {
Expand All @@ -70,14 +72,17 @@ func newObsReportSender[K request.Request](set exporter.Settings, signal pipelin
case pipeline.SignalTraces:
or.itemsSentInst = telemetryBuilder.ExporterSentSpans
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedSpans
or.itemsRetryDroppedInst = telemetryBuilder.ExporterRetryDroppedSpans

case pipeline.SignalMetrics:
or.itemsSentInst = telemetryBuilder.ExporterSentMetricPoints
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedMetricPoints
or.itemsRetryDroppedInst = telemetryBuilder.ExporterRetryDroppedMetricPoints

case pipeline.SignalLogs:
or.itemsSentInst = telemetryBuilder.ExporterSentLogRecords
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedLogRecords
or.itemsRetryDroppedInst = telemetryBuilder.ExporterRetryDroppedLogRecords
}

return or, nil
Expand Down Expand Up @@ -116,6 +121,10 @@ func (ors *obsReportSender[K]) endOp(ctx context.Context, numLogRecords int, err
if ors.itemsFailedInst != nil {
ors.itemsFailedInst.Add(ctx, numFailedToSend, ors.metricAttr)
}
// Count drops after retries were exhausted.
if err != nil && ors.itemsRetryDroppedInst != nil && experr.IsRetriesExhaustedErr(err) {
ors.itemsRetryDroppedInst.Add(ctx, numFailedToSend, ors.metricAttr)
}

span := trace.SpanFromContext(ctx)
defer span.End()
Expand Down
38 changes: 38 additions & 0 deletions exporter/exporterhelper/internal/obs_report_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadatatest"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
Expand All @@ -31,6 +32,43 @@ var (
errFake = errors.New("errFake")
)

func TestExportTraceRetryDroppedMetric(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

obsrep, err := newObsReportSender(
exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
pipeline.SignalTraces,
sender.NewSender(func(context.Context, request.Request) error {
return experr.NewRetriesExhaustedErr(errFake)
}),
)
require.NoError(t, err)

req := &requesttest.FakeRequest{Items: 7}
sendErr := obsrep.Send(context.Background(), req)
require.Error(t, sendErr)
require.True(t, experr.IsRetriesExhaustedErr(sendErr))

wantAttrs := attribute.NewSet(attribute.String("exporter", exporterID.String()))

metadatatest.AssertEqualExporterSendFailedSpans(t, tt,
[]metricdata.DataPoint[int64]{
{
Attributes: wantAttrs,
Value: int64(req.Items),
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())

metadatatest.AssertEqualExporterRetryDroppedSpans(t, tt,
[]metricdata.DataPoint[int64]{
{
Attributes: wantAttrs,
Value: int64(req.Items),
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
}

func TestExportTraceDataOp(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
Expand Down
16 changes: 12 additions & 4 deletions exporter/exporterhelper/internal/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,17 @@ func (rs *retrySender) Send(ctx context.Context, req request.Request) error {
}
span := trace.SpanFromContext(ctx)
retryNum := int64(0)
retried := false
var maxElapsedTime time.Time
if rs.cfg.MaxElapsedTime > 0 {
maxElapsedTime = time.Now().Add(rs.cfg.MaxElapsedTime)
}
wrapRetryErr := func(e error) error {
if retried {
return experr.NewRetriesExhaustedErr(e)
}
return e
}
for {
span.AddEvent(
"Sending request.",
Expand All @@ -104,7 +111,7 @@ func (rs *retrySender) Send(ctx context.Context, req request.Request) error {

backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
return fmt.Errorf("no more retries left: %w", err)
return wrapRetryErr(fmt.Errorf("no more retries left: %w", err))
}

throttleErr := throttleRetry{}
Expand All @@ -115,13 +122,13 @@ func (rs *retrySender) Send(ctx context.Context, req request.Request) error {
nextRetryTime := time.Now().Add(backoffDelay)
if !maxElapsedTime.IsZero() && maxElapsedTime.Before(nextRetryTime) {
// The delay is longer than the maxElapsedTime.
return fmt.Errorf("no more retries left: %w", err)
return wrapRetryErr(fmt.Errorf("no more retries left: %w", err))
}

if deadline, has := ctx.Deadline(); has && deadline.Before(nextRetryTime) {
// The delay is longer than the deadline. There is no point in
// waiting for cancelation.
return fmt.Errorf("request will be cancelled before next retry: %w", err)
return wrapRetryErr(fmt.Errorf("request will be cancelled before next retry: %w", err))
}

backoffDelayStr := backoffDelay.String()
Expand All @@ -140,10 +147,11 @@ func (rs *retrySender) Send(ctx context.Context, req request.Request) error {
// back-off, but get interrupted when shutting down or request is cancelled or timed out.
select {
case <-ctx.Done():
return fmt.Errorf("request is cancelled or timed out: %w", err)
return wrapRetryErr(fmt.Errorf("request is cancelled or timed out: %w", err))
case <-rs.stopCh:
return experr.NewShutdownErr(err)
case <-time.After(backoffDelay):
retried = true
}
}
}
Loading