Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: pkg/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Use `configoptional.Optional` for the `exporterhelper.QueueBatchConfig`

# One or more tracking issues or pull requests related to the change
issues: [14155]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
It's recommended to change the field type in your component configuration to be `configoptional.Optional[exporterhelper.QueueBatchConfig]` to keep the `enabled` subfield. Use configoptional.Some(exporterhelper.NewDefaultQueueConfig()) to enable by default. Use configoptional.Default(exporterhelper.NewDefaultQueueConfig()) to disable by default.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
3 changes: 2 additions & 1 deletion exporter/debugexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)
Expand All @@ -33,7 +34,7 @@ type Config struct {
// UseInternalLogger defines whether the exporter sends the output to the collector's internal logger.
UseInternalLogger bool `mapstructure:"use_internal_logger"`

QueueConfig exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"`
QueueConfig configoptional.Optional[exporterhelper.QueueBatchConfig] `mapstructure:"sending_queue"`

// prevent unkeyed literal initialization
_ struct{}
Expand Down
4 changes: 2 additions & 2 deletions exporter/debugexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/confmaptest"
Expand All @@ -25,7 +26,6 @@ func TestUnmarshalDefaultConfig(t *testing.T) {

func TestUnmarshalConfig(t *testing.T) {
queueCfg := exporterhelper.NewDefaultQueueConfig()
queueCfg.Enabled = false
tests := []struct {
filename string
cfg *Config
Expand All @@ -37,7 +37,7 @@ func TestUnmarshalConfig(t *testing.T) {
Verbosity: configtelemetry.LevelDetailed,
SamplingInitial: 10,
SamplingThereafter: 50,
QueueConfig: queueCfg,
QueueConfig: configoptional.Default(queueCfg),
},
},
{
Expand Down
8 changes: 6 additions & 2 deletions exporter/debugexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter/debugexporter/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -111,15 +113,17 @@ func createTestCases() []testCase {
name: "default config",
config: func() *Config {
c := createDefaultConfig().(*Config)
c.QueueConfig.QueueSize = 10
c.QueueConfig = configoptional.Some(exporterhelper.NewDefaultQueueConfig())
c.QueueConfig.Get().QueueSize = 10
return c
}(),
},
{
name: "don't use internal logger",
config: func() *Config {
cfg := createDefaultConfig().(*Config)
cfg.QueueConfig.QueueSize = 10
cfg.QueueConfig = configoptional.Some(exporterhelper.NewDefaultQueueConfig())
cfg.QueueConfig.Get().QueueSize = 10
cfg.UseInternalLogger = false
return cfg
}(),
Expand Down
6 changes: 2 additions & 4 deletions exporter/debugexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -42,15 +43,12 @@ func NewFactory() exporter.Factory {
}

func createDefaultConfig() component.Config {
queueCfg := exporterhelper.NewDefaultQueueConfig()
queueCfg.Enabled = false

return &Config{
Verbosity: configtelemetry.LevelBasic,
SamplingInitial: defaultSamplingInitial,
SamplingThereafter: defaultSamplingThereafter,
UseInternalLogger: true,
QueueConfig: queueCfg,
QueueConfig: configoptional.Default(exporterhelper.NewDefaultQueueConfig()),
}
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/debugexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/collector/component v1.46.0
go.opentelemetry.io/collector/component/componenttest v0.140.0
go.opentelemetry.io/collector/config/configoptional v1.46.0
go.opentelemetry.io/collector/config/configtelemetry v0.140.0
go.opentelemetry.io/collector/confmap v1.46.0
go.opentelemetry.io/collector/consumer v1.46.0
Expand Down Expand Up @@ -43,7 +44,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/collector/client v1.46.0 // indirect
go.opentelemetry.io/collector/config/configoptional v1.46.0 // indirect
go.opentelemetry.io/collector/config/configretry v1.46.0 // indirect
go.opentelemetry.io/collector/confmap/xconfmap v0.140.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.140.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion exporter/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -22,7 +23,7 @@ var typeStr = component.MustNewType("example")

// exampleConfig holds configuration settings for the exporter.
type exampleConfig struct {
QueueSettings exporterhelper.QueueBatchConfig
QueueSettings configoptional.Optional[exporterhelper.QueueBatchConfig]
BackOffConfig configretry.BackOffConfig
}

Expand Down
17 changes: 9 additions & 8 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -47,7 +48,7 @@ type BaseExporter struct {
retryCfg configretry.BackOffConfig

queueBatchSettings queuebatch.Settings[request.Request]
queueCfg queuebatch.Config
queueCfg configoptional.Optional[queuebatch.Config]
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sender.SendFunc[request.Request], options ...Option) (*BaseExporter, error) {
Expand Down Expand Up @@ -82,19 +83,19 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
return nil, err
}

if be.queueCfg.Batch.HasValue() {
if be.queueCfg.HasValue() && be.queueCfg.Get().Batch.HasValue() {
// Batcher mutates the data.
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
}

if be.queueCfg.Enabled {
if be.queueCfg.HasValue() {
qSet := queuebatch.AllSettings[request.Request]{
Settings: be.queueBatchSettings,
Signal: signal,
ID: set.ID,
Telemetry: set.TelemetrySettings,
}
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.ExportFailureMessage, be.firstSender)
be.QueueSender, err = NewQueueSender(qSet, *be.queueCfg.Get(), be.ExportFailureMessage, be.firstSender)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -191,7 +192,7 @@ func WithRetry(config configretry.BackOffConfig) Option {
// WithQueue overrides the default queuebatch.Config for an exporter.
// The default queuebatch.Config is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(cfg queuebatch.Config) Option {
func WithQueue(cfg configoptional.Optional[queuebatch.Config]) Option {
return func(o *BaseExporter) error {
if o.queueBatchSettings.Encoding == nil {
return errors.New("WithQueue option is not available for the new request exporters, use WithQueueBatch instead")
Expand All @@ -204,13 +205,13 @@ func WithQueue(cfg queuebatch.Config) Option {
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithQueueBatch(cfg queuebatch.Config, set queuebatch.Settings[request.Request]) Option {
func WithQueueBatch(cfg configoptional.Optional[queuebatch.Config], set queuebatch.Settings[request.Request]) Option {
return func(o *BaseExporter) error {
if !cfg.Enabled {
if !cfg.HasValue() {
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
if cfg.StorageID != nil && set.Encoding == nil {
if cfg.Get().StorageID != nil && set.Encoding == nil {
return errors.New("`Settings.Encoding` must not be nil when persistent queue is enabled")
}
o.queueBatchSettings = set
Expand Down
15 changes: 6 additions & 9 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
Expand Down Expand Up @@ -49,7 +50,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
require.NoError(t, err)
require.Nil(t, bs.queueBatchSettings.Encoding)
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(configoptional.Some(NewDefaultQueueConfig())))
require.Error(t, err)

qCfg := NewDefaultQueueConfig()
Expand All @@ -58,7 +59,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
WithQueueBatchSettings(newFakeQueueBatch()),
WithRetry(configretry.NewDefaultBackOffConfig()),
WithQueueBatch(qCfg, queuebatch.Settings[request.Request]{}))
WithQueueBatch(configoptional.Some(qCfg), queuebatch.Settings[request.Request]{}))
require.Error(t, err)
}

Expand All @@ -72,7 +73,7 @@ func TestBaseExporterLogging(t *testing.T) {
qCfg.WaitForResult = true
bs, err := NewBaseExporter(set, pipeline.SignalMetrics, errExport,
WithQueueBatchSettings(newFakeQueueBatch()),
WithQueue(qCfg),
WithQueue(configoptional.Some(qCfg)),
WithRetry(rCfg))
require.NoError(t, err)
require.NoError(t, bs.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -98,19 +99,15 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
queueOptions: []Option{
WithQueueBatchSettings(newFakeQueueBatch()),
func() Option {
qs := NewDefaultQueueConfig()
qs.Enabled = false
return WithQueue(qs)
return WithQueue(configoptional.None[queuebatch.Config]())
}(),
},
},
{
name: "WithRequestQueue",
queueOptions: []Option{
func() Option {
qs := NewDefaultQueueConfig()
qs.Enabled = false
return WithQueueBatch(qs, newFakeQueueBatch())
return WithQueueBatch(configoptional.None[queuebatch.Config](), newFakeQueueBatch())
}(),
},
},
Expand Down
15 changes: 8 additions & 7 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ import (
)

// NewDefaultQueueConfig returns the default config for queuebatch.Config.
// By default, the queue stores 1000 requests of telemetry and is non-blocking when full.
// By default:
//
// - the queue stores 1000 requests of telemetry
// - is non-blocking when full
// - concurrent exports limited to 10
// - emits batches of 8192 items, timeout 200ms
func NewDefaultQueueConfig() queuebatch.Config {
return queuebatch.Config{
Enabled: true,
Sizer: request.SizerTypeRequests,
NumConsumers: 10,
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
// This can be estimated at 1-4 GB worth of maximum memory usage
// This default is probably still too high, and may be adjusted further down in a future release
Sizer: request.SizerTypeRequests,
NumConsumers: 10,
QueueSize: 1_000,
BlockOnOverflow: false,
Batch: configoptional.Default(queuebatch.BatchConfig{
Expand Down
8 changes: 5 additions & 3 deletions exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
Expand All @@ -31,8 +32,9 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
}
logger, observed := observer.New(zap.ErrorLevel)
qSet.Telemetry.Logger = zap.New(logger)
qCfg := NewDefaultQueueConfig()
be, err := NewQueueSender(
qSet, NewDefaultQueueConfig(), "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
qSet, qCfg, "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
require.NoError(t, err)

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -54,6 +56,6 @@ func TestQueueConfig_Validate(t *testing.T) {
require.EqualError(t, qCfg.Validate(), "`queue_size` must be positive")

// Confirm Validate doesn't return error with invalid config when feature is disabled
qCfg.Enabled = false
assert.NoError(t, qCfg.Validate())
noCfg := configoptional.None[queuebatch.Config]()
assert.NoError(t, noCfg.Validate())
}
7 changes: 0 additions & 7 deletions exporter/exporterhelper/internal/queuebatch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ import (

// Config defines configuration for queueing and batching incoming requests.
type Config struct {
// Enabled indicates whether to not enqueue and batch before exporting.
Enabled bool `mapstructure:"enabled"`

// WaitForResult determines if incoming requests are blocked until the request is processed or not.
// Currently, this option is not available when persistent queue is configured using the storage configuration.
WaitForResult bool `mapstructure:"wait_for_result"`
Expand Down Expand Up @@ -66,10 +63,6 @@ func (cfg *Config) Unmarshal(conf *confmap.Conf) error {

// Validate checks if the Config is valid
func (cfg *Config) Validate() error {
if !cfg.Enabled {
return nil
}

if cfg.NumConsumers <= 0 {
return errors.New("`num_consumers` must be positive")
}
Expand Down
Loading
Loading