From d7e26715708f9e5df17b872ee003791fe7aa8b06 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Wed, 12 Nov 2025 18:31:54 +0100 Subject: [PATCH 1/7] [exporterhelper] Use configoptional.Optional for exporterhelper QueueBatchConfig --- exporter/debugexporter/config.go | 3 +- exporter/debugexporter/config_test.go | 5 +-- exporter/debugexporter/exporter_test.go | 7 +++- exporter/debugexporter/factory.go | 6 +-- exporter/debugexporter/go.mod | 2 +- .../exporterhelper/internal/base_exporter.go | 17 +++++---- .../internal/base_exporter_test.go | 13 +++---- .../exporterhelper/internal/queue_sender.go | 7 ++-- .../internal/queue_sender_test.go | 10 +++-- .../internal/queuebatch/config.go | 7 ---- .../internal/queuebatch/config_test.go | 37 ++++++++----------- .../internal/queuebatch/queue_batch_test.go | 2 - exporter/exporterhelper/logs_test.go | 2 +- exporter/exporterhelper/metrics_test.go | 2 +- exporter/exporterhelper/queue_batch.go | 3 +- exporter/exporterhelper/traces_test.go | 2 +- .../exporterhelper/xexporterhelper/go.mod | 2 +- .../xexporterhelper/new_request.go | 3 +- .../xexporterhelper/profiles_test.go | 2 +- exporter/otlpexporter/config.go | 9 +++-- exporter/otlpexporter/config_test.go | 10 ++--- exporter/otlpexporter/otlp_test.go | 12 +++--- exporter/otlphttpexporter/config.go | 7 ++-- exporter/otlphttpexporter/config_test.go | 5 +-- exporter/otlphttpexporter/factory_test.go | 2 +- internal/e2e/consume_contract_test.go | 3 +- internal/e2e/error_propagation_test.go | 5 ++- internal/e2e/otlphttp_test.go | 4 +- 28 files changed, 91 insertions(+), 98 deletions(-) diff --git a/exporter/debugexporter/config.go b/exporter/debugexporter/config.go index fbf962d7c89..acc69038821 100644 --- a/exporter/debugexporter/config.go +++ b/exporter/debugexporter/config.go @@ -7,6 +7,7 @@ import ( "fmt" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/exporter/exporterhelper" ) @@ -33,7 +34,7 @@ type Config struct { // UseInternalLogger defines whether the exporter sends the output to the collector's internal logger. UseInternalLogger bool `mapstructure:"use_internal_logger"` - QueueConfig exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"` + QueueConfig configoptional.Optional[exporterhelper.QueueBatchConfig] `mapstructure:"sending_queue"` // prevent unkeyed literal initialization _ struct{} diff --git a/exporter/debugexporter/config_test.go b/exporter/debugexporter/config_test.go index 51b4c875e79..fa06517138d 100644 --- a/exporter/debugexporter/config_test.go +++ b/exporter/debugexporter/config_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/confmaptest" @@ -24,8 +25,6 @@ func TestUnmarshalDefaultConfig(t *testing.T) { } func TestUnmarshalConfig(t *testing.T) { - queueCfg := exporterhelper.NewDefaultQueueConfig() - queueCfg.Enabled = false tests := []struct { filename string cfg *Config @@ -37,7 +36,7 @@ func TestUnmarshalConfig(t *testing.T) { Verbosity: configtelemetry.LevelDetailed, SamplingInitial: 10, SamplingThereafter: 50, - QueueConfig: queueCfg, + QueueConfig: configoptional.None[exporterhelper.QueueBatchConfig](), }, }, { diff --git a/exporter/debugexporter/exporter_test.go b/exporter/debugexporter/exporter_test.go index 367f5f10eee..f95e16891a4 100644 --- a/exporter/debugexporter/exporter_test.go +++ b/exporter/debugexporter/exporter_test.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/exporter/debugexporter/internal/metadata" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -111,7 +112,8 @@ func createTestCases() []testCase { name: "default config", config: func() *Config { c := createDefaultConfig().(*Config) - c.QueueConfig.QueueSize = 10 + c.QueueConfig = exporterhelper.NewDefaultQueueConfig() + c.QueueConfig.Get().QueueSize = 10 return c }(), }, @@ -119,7 +121,8 @@ func createTestCases() []testCase { name: "don't use internal logger", config: func() *Config { cfg := createDefaultConfig().(*Config) - cfg.QueueConfig.QueueSize = 10 + cfg.QueueConfig = exporterhelper.NewDefaultQueueConfig() + cfg.QueueConfig.Get().QueueSize = 10 cfg.UseInternalLogger = false return cfg }(), diff --git a/exporter/debugexporter/factory.go b/exporter/debugexporter/factory.go index e950d51ba7e..fe5e70fd56d 100644 --- a/exporter/debugexporter/factory.go +++ b/exporter/debugexporter/factory.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" @@ -42,15 +43,12 @@ func NewFactory() exporter.Factory { } func createDefaultConfig() component.Config { - queueCfg := exporterhelper.NewDefaultQueueConfig() - queueCfg.Enabled = false - return &Config{ Verbosity: configtelemetry.LevelBasic, SamplingInitial: defaultSamplingInitial, SamplingThereafter: defaultSamplingThereafter, UseInternalLogger: true, - QueueConfig: queueCfg, + QueueConfig: configoptional.None[exporterhelper.QueueBatchConfig](), } } diff --git a/exporter/debugexporter/go.mod b/exporter/debugexporter/go.mod index 1806b3b2663..e99f6212020 100644 --- a/exporter/debugexporter/go.mod +++ b/exporter/debugexporter/go.mod @@ -6,6 +6,7 @@ require ( github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.45.0 go.opentelemetry.io/collector/component/componenttest v0.139.0 + go.opentelemetry.io/collector/config/configoptional v1.45.0 go.opentelemetry.io/collector/config/configtelemetry v0.139.0 go.opentelemetry.io/collector/confmap v1.45.0 go.opentelemetry.io/collector/consumer v1.45.0 @@ -43,7 +44,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/client v1.45.0 // indirect - go.opentelemetry.io/collector/config/configoptional v1.45.0 // indirect go.opentelemetry.io/collector/config/configretry v1.45.0 // indirect go.opentelemetry.io/collector/confmap/xconfmap v0.139.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.139.0 // indirect diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 147ff280a7e..884ba46ca33 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" @@ -47,7 +48,7 @@ type BaseExporter struct { retryCfg configretry.BackOffConfig queueBatchSettings queuebatch.Settings[request.Request] - queueCfg queuebatch.Config + queueCfg configoptional.Optional[queuebatch.Config] } func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sender.SendFunc[request.Request], options ...Option) (*BaseExporter, error) { @@ -82,19 +83,19 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende return nil, err } - if be.queueCfg.Batch.HasValue() { + if be.queueCfg.HasValue() && be.queueCfg.Get().Batch.HasValue() { // Batcher mutates the data. be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true})) } - if be.queueCfg.Enabled { + if be.queueCfg.HasValue() { qSet := queuebatch.AllSettings[request.Request]{ Settings: be.queueBatchSettings, Signal: signal, ID: set.ID, Telemetry: set.TelemetrySettings, } - be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.ExportFailureMessage, be.firstSender) + be.QueueSender, err = NewQueueSender(qSet, *be.queueCfg.Get(), be.ExportFailureMessage, be.firstSender) if err != nil { return nil, err } @@ -191,7 +192,7 @@ func WithRetry(config configretry.BackOffConfig) Option { // WithQueue overrides the default queuebatch.Config for an exporter. // The default queuebatch.Config is to disable queueing. // This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. -func WithQueue(cfg queuebatch.Config) Option { +func WithQueue(cfg configoptional.Optional[queuebatch.Config]) Option { return func(o *BaseExporter) error { if o.queueBatchSettings.Encoding == nil { return errors.New("WithQueue option is not available for the new request exporters, use WithQueueBatch instead") @@ -204,13 +205,13 @@ func WithQueue(cfg queuebatch.Config) Option { // This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func WithQueueBatch(cfg queuebatch.Config, set queuebatch.Settings[request.Request]) Option { +func WithQueueBatch(cfg configoptional.Optional[queuebatch.Config], set queuebatch.Settings[request.Request]) Option { return func(o *BaseExporter) error { - if !cfg.Enabled { + if !cfg.HasValue() { o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures." return nil } - if cfg.StorageID != nil && set.Encoding == nil { + if cfg.Get().StorageID != nil && set.Encoding == nil { return errors.New("`Settings.Encoding` must not be nil when persistent queue is enabled") } o.queueBatchSettings = set diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index 5a5da91a9f8..28b42e834e8 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" @@ -54,7 +55,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) { qCfg := NewDefaultQueueConfig() storageID := component.NewID(component.MustNewType("test")) - qCfg.StorageID = &storageID + qCfg.Get().StorageID = &storageID _, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport, WithQueueBatchSettings(newFakeQueueBatch()), WithRetry(configretry.NewDefaultBackOffConfig()), @@ -69,7 +70,7 @@ func TestBaseExporterLogging(t *testing.T) { rCfg := configretry.NewDefaultBackOffConfig() rCfg.Enabled = false qCfg := NewDefaultQueueConfig() - qCfg.WaitForResult = true + qCfg.Get().WaitForResult = true bs, err := NewBaseExporter(set, pipeline.SignalMetrics, errExport, WithQueueBatchSettings(newFakeQueueBatch()), WithQueue(qCfg), @@ -98,9 +99,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { queueOptions: []Option{ WithQueueBatchSettings(newFakeQueueBatch()), func() Option { - qs := NewDefaultQueueConfig() - qs.Enabled = false - return WithQueue(qs) + return WithQueue(configoptional.None[queuebatch.Config]()) }(), }, }, @@ -108,9 +107,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { name: "WithRequestQueue", queueOptions: []Option{ func() Option { - qs := NewDefaultQueueConfig() - qs.Enabled = false - return WithQueueBatch(qs, newFakeQueueBatch()) + return WithQueueBatch(configoptional.None[queuebatch.Config](), newFakeQueueBatch()) }(), }, }, diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index c49c7265647..d2c04453d54 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -17,9 +17,8 @@ import ( // NewDefaultQueueConfig returns the default config for queuebatch.Config. // By default, the queue stores 1000 requests of telemetry and is non-blocking when full. -func NewDefaultQueueConfig() queuebatch.Config { - return queuebatch.Config{ - Enabled: true, +func NewDefaultQueueConfig() configoptional.Optional[queuebatch.Config] { + return configoptional.Some(queuebatch.Config{ Sizer: request.SizerTypeRequests, NumConsumers: 10, // By default, batches are 8192 spans, for a total of up to 8 million spans in the queue @@ -32,7 +31,7 @@ func NewDefaultQueueConfig() queuebatch.Config { Sizer: request.SizerTypeItems, MinSize: 8192, }), - } + }) } func NewQueueSender( diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index ad6020b5a1f..64ffada49f5 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest" @@ -31,8 +32,9 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) { } logger, observed := observer.New(zap.ErrorLevel) qSet.Telemetry.Logger = zap.New(logger) + qCfg := NewDefaultQueueConfig() be, err := NewQueueSender( - qSet, NewDefaultQueueConfig(), "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") })) + qSet, *qCfg.Get(), "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") })) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -46,14 +48,14 @@ func TestQueueConfig_Validate(t *testing.T) { qCfg := NewDefaultQueueConfig() require.NoError(t, qCfg.Validate()) - qCfg.NumConsumers = 0 + qCfg.Get().NumConsumers = 0 require.EqualError(t, qCfg.Validate(), "`num_consumers` must be positive") qCfg = NewDefaultQueueConfig() - qCfg.QueueSize = 0 + qCfg.Get().QueueSize = 0 require.EqualError(t, qCfg.Validate(), "`queue_size` must be positive") // Confirm Validate doesn't return error with invalid config when feature is disabled - qCfg.Enabled = false + qCfg = configoptional.None[queuebatch.Config]() assert.NoError(t, qCfg.Validate()) } diff --git a/exporter/exporterhelper/internal/queuebatch/config.go b/exporter/exporterhelper/internal/queuebatch/config.go index ecef3ce3040..78569b85d82 100644 --- a/exporter/exporterhelper/internal/queuebatch/config.go +++ b/exporter/exporterhelper/internal/queuebatch/config.go @@ -16,9 +16,6 @@ import ( // Config defines configuration for queueing and batching incoming requests. type Config struct { - // Enabled indicates whether to not enqueue and batch before exporting. - Enabled bool `mapstructure:"enabled"` - // WaitForResult determines if incoming requests are blocked until the request is processed or not. // Currently, this option is not available when persistent queue is configured using the storage configuration. WaitForResult bool `mapstructure:"wait_for_result"` @@ -66,10 +63,6 @@ func (cfg *Config) Unmarshal(conf *confmap.Conf) error { // Validate checks if the Config is valid func (cfg *Config) Validate() error { - if !cfg.Enabled { - return nil - } - if cfg.NumConsumers <= 0 { return errors.New("`num_consumers` must be positive") } diff --git a/exporter/exporterhelper/internal/queuebatch/config_test.go b/exporter/exporterhelper/internal/queuebatch/config_test.go index f55f7bcb8ff..b43bb5aa073 100644 --- a/exporter/exporterhelper/internal/queuebatch/config_test.go +++ b/exporter/exporterhelper/internal/queuebatch/config_test.go @@ -50,10 +50,6 @@ func TestConfig_Validate(t *testing.T) { cfg = newTestConfig() cfg.Sizer = request.SizerTypeBytes require.NoError(t, xconfmap.Validate(cfg)) - - // Confirm Validate doesn't return error with invalid config when feature is disabled - cfg.Enabled = false - assert.NoError(t, xconfmap.Validate(cfg)) } func TestBatchConfig_Validate(t *testing.T) { @@ -96,9 +92,8 @@ func newTestBatchConfig() BatchConfig { } func TestUnmarshal(t *testing.T) { - newBaseCfg := func() Config { - return Config{ - Enabled: true, + newBaseCfg := func() configoptional.Optional[Config] { + return configoptional.Some(Config{ Sizer: request.SizerTypeRequests, NumConsumers: 10, QueueSize: 1_000, @@ -107,38 +102,38 @@ func TestUnmarshal(t *testing.T) { Sizer: request.SizerTypeItems, MinSize: 8192, }), - } + }) } tests := []struct { path string expectedErr string - expectedCfg func() Config + expectedCfg func() configoptional.Optional[Config] }{ { path: "batch_set_empty_explicit_sizer.yaml", - expectedCfg: func() Config { + expectedCfg: func() configoptional.Optional[Config] { cfg := newBaseCfg() - cfg.Sizer = request.SizerTypeBytes + cfg.Get().Sizer = request.SizerTypeBytes // Batch is set, sizer is not overridden - cfg.Batch.GetOrInsertDefault() + cfg.Get().Batch.GetOrInsertDefault() return cfg }, }, { path: "batch_set_empty_no_explicit_sizer.yaml", - expectedCfg: func() Config { + expectedCfg: func() configoptional.Optional[Config] { cfg := newBaseCfg() - cfg.Batch.GetOrInsertDefault() + cfg.Get().Batch.GetOrInsertDefault() return cfg }, }, { path: "batch_set_nonempty_explicit_sizer.yaml", - expectedCfg: func() Config { + expectedCfg: func() configoptional.Optional[Config] { cfg := newBaseCfg() - cfg.Sizer = request.SizerTypeBytes - cfg.QueueSize = 2000 - cfg.Batch = configoptional.Some(BatchConfig{ + cfg.Get().Sizer = request.SizerTypeBytes + cfg.Get().QueueSize = 2000 + cfg.Get().Batch = configoptional.Some(BatchConfig{ FlushTimeout: 200 * time.Millisecond, // Sizer has been overridden by parent sizer Sizer: request.SizerTypeBytes, @@ -149,10 +144,10 @@ func TestUnmarshal(t *testing.T) { }, { path: "batch_set_nonempty_no_explicit_sizer.yaml", - expectedCfg: func() Config { + expectedCfg: func() configoptional.Optional[Config] { cfg := newBaseCfg() - cfg.QueueSize = 2000 - cfg.Batch = configoptional.Some(BatchConfig{ + cfg.Get().QueueSize = 2000 + cfg.Get().Batch = configoptional.Some(BatchConfig{ FlushTimeout: 200 * time.Millisecond, // Sizer has NOT been overridden by parent sizer Sizer: request.SizerTypeItems, diff --git a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go index 0fcaf70ec1f..2ec8914fb65 100644 --- a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go +++ b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go @@ -124,7 +124,6 @@ func TestQueueBatchDifferentSizers(t *testing.T) { // because the bytes size is used for the queue, // but split because the items size is used for batch. cfg := Config{ - Enabled: true, WaitForResult: false, Sizer: request.SizerTypeBytes, QueueSize: 100, @@ -604,7 +603,6 @@ func TestQueueBatchTimerFlush(t *testing.T) { func newTestConfig() Config { return Config{ - Enabled: true, WaitForResult: false, Sizer: request.SizerTypeItems, NumConsumers: runtime.NumCPU(), diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 132712b567f..6b176c79b85 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -101,7 +101,7 @@ func TestLogs_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID + qCfg.Get().StorageID = &storageID set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index dc559d9f9f3..2aeb99e8c61 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -101,7 +101,7 @@ func TestMetrics_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID + qCfg.Get().StorageID = &storageID set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_metrics", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ diff --git a/exporter/exporterhelper/queue_batch.go b/exporter/exporterhelper/queue_batch.go index 2715cdc3c93..ce4eee560cd 100644 --- a/exporter/exporterhelper/queue_batch.go +++ b/exporter/exporterhelper/queue_batch.go @@ -6,6 +6,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte import ( "context" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" @@ -14,7 +15,7 @@ import ( // WithQueue overrides the default QueueBatchConfig for an exporter. // The default QueueBatchConfig is to disable queueing. // This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. -func WithQueue(config QueueBatchConfig) Option { +func WithQueue(config configoptional.Optional[QueueBatchConfig]) Option { return internal.WithQueue(config) } diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index d4cb14c3b2f..0a301e562a9 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -103,7 +103,7 @@ func TestTraces_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID + qCfg.Get().StorageID = &storageID set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ diff --git a/exporter/exporterhelper/xexporterhelper/go.mod b/exporter/exporterhelper/xexporterhelper/go.mod index 0e04048b5e7..db87b0b7868 100644 --- a/exporter/exporterhelper/xexporterhelper/go.mod +++ b/exporter/exporterhelper/xexporterhelper/go.mod @@ -6,6 +6,7 @@ require ( github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.45.0 go.opentelemetry.io/collector/component/componenttest v0.139.0 + go.opentelemetry.io/collector/config/configoptional v1.45.0 go.opentelemetry.io/collector/consumer v1.45.0 go.opentelemetry.io/collector/consumer/consumererror v0.139.0 go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.139.0 @@ -46,7 +47,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/client v1.45.0 // indirect - go.opentelemetry.io/collector/config/configoptional v1.45.0 // indirect go.opentelemetry.io/collector/config/configretry v1.45.0 // indirect go.opentelemetry.io/collector/confmap v1.45.0 // indirect go.opentelemetry.io/collector/confmap/xconfmap v0.139.0 // indirect diff --git a/exporter/exporterhelper/xexporterhelper/new_request.go b/exporter/exporterhelper/xexporterhelper/new_request.go index 7fe7472eeb5..044bc13634c 100644 --- a/exporter/exporterhelper/xexporterhelper/new_request.go +++ b/exporter/exporterhelper/xexporterhelper/new_request.go @@ -6,6 +6,7 @@ package xexporterhelper // import "go.opentelemetry.io/collector/exporter/export import ( "context" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" @@ -83,6 +84,6 @@ func NewTracesQueueBatchSettings() QueueBatchSettings { // This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func WithQueueBatch(cfg exporterhelper.QueueBatchConfig, set QueueBatchSettings) exporterhelper.Option { +func WithQueueBatch(cfg configoptional.Optional[exporterhelper.QueueBatchConfig], set QueueBatchSettings) exporterhelper.Option { return internal.WithQueueBatch(cfg, set) } diff --git a/exporter/exporterhelper/xexporterhelper/profiles_test.go b/exporter/exporterhelper/xexporterhelper/profiles_test.go index a48d5a76075..b422e97e590 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_test.go @@ -171,7 +171,7 @@ func TestProfiles_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := exporterhelper.NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID + qCfg.Get().StorageID = &storageID set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ diff --git a/exporter/otlpexporter/config.go b/exporter/otlpexporter/config.go index 7bcd17c0235..553a8a82eb4 100644 --- a/exporter/otlpexporter/config.go +++ b/exporter/otlpexporter/config.go @@ -13,16 +13,17 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper" ) // Config defines configuration for OTLP exporter. type Config struct { - TimeoutConfig exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. - QueueConfig exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"` - RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"` - ClientConfig configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + TimeoutConfig exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + QueueConfig configoptional.Optional[exporterhelper.QueueBatchConfig] `mapstructure:"sending_queue"` + RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"` + ClientConfig configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. // prevent unkeyed literal initialization _ struct{} diff --git a/exporter/otlpexporter/config_test.go b/exporter/otlpexporter/config_test.go index c87e0d9de37..5afbbe845f0 100644 --- a/exporter/otlpexporter/config_test.go +++ b/exporter/otlpexporter/config_test.go @@ -51,8 +51,7 @@ func TestUnmarshalConfig(t *testing.T) { MaxInterval: 1 * time.Minute, MaxElapsedTime: 10 * time.Minute, }, - QueueConfig: exporterhelper.QueueBatchConfig{ - Enabled: true, + QueueConfig: configoptional.Some(exporterhelper.QueueBatchConfig{ Sizer: exporterhelper.RequestSizerTypeItems, NumConsumers: 2, QueueSize: 100000, @@ -62,7 +61,7 @@ func TestUnmarshalConfig(t *testing.T) { MinSize: 1000, MaxSize: 10000, }), - }, + }), ClientConfig: configgrpc.ClientConfig{ Headers: configopaque.MapList{ {Name: "another", Value: "somevalue"}, @@ -102,8 +101,7 @@ func TestUnmarshalDefaultBatchConfig(t *testing.T) { Timeout: 10 * time.Second, }, RetryConfig: configretry.NewDefaultBackOffConfig(), - QueueConfig: exporterhelper.QueueBatchConfig{ - Enabled: true, + QueueConfig: configoptional.Some(exporterhelper.QueueBatchConfig{ Sizer: exporterhelper.RequestSizerTypeRequests, QueueSize: 1000, NumConsumers: 10, @@ -112,7 +110,7 @@ func TestUnmarshalDefaultBatchConfig(t *testing.T) { Sizer: exporterhelper.RequestSizerTypeItems, MinSize: 8192, }), - }, + }), ClientConfig: configgrpc.ClientConfig{ Endpoint: "1.2.3.4:1234", Compression: "gzip", diff --git a/exporter/otlpexporter/otlp_test.go b/exporter/otlpexporter/otlp_test.go index 30c5b718411..2cf121b34c7 100644 --- a/exporter/otlpexporter/otlp_test.go +++ b/exporter/otlpexporter/otlp_test.go @@ -28,8 +28,10 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/xexporter" "go.opentelemetry.io/collector/pdata/plog" @@ -309,7 +311,7 @@ func TestSendTraces(t *testing.T) { cfg := factory.CreateDefaultConfig().(*Config) // Disable queuing to ensure that we execute the request when calling ConsumeTraces // otherwise we will not see any errors. - cfg.QueueConfig.Enabled = false + cfg.QueueConfig = configoptional.None[exporterhelper.QueueBatchConfig]() cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLS: configtls.ClientConfig{ @@ -481,7 +483,7 @@ func TestSendMetrics(t *testing.T) { cfg := factory.CreateDefaultConfig().(*Config) // Disable queuing to ensure that we execute the request when calling ConsumeMetrics // otherwise we will not see any errors. - cfg.QueueConfig.Enabled = false + cfg.QueueConfig = configoptional.None[exporterhelper.QueueBatchConfig]() cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLS: configtls.ClientConfig{ @@ -586,7 +588,7 @@ func TestSendTraceDataServerDownAndUp(t *testing.T) { cfg := factory.CreateDefaultConfig().(*Config) // Disable queuing to ensure that we execute the request when calling ConsumeTraces // otherwise we will not see the error. - cfg.QueueConfig.Enabled = false + cfg.QueueConfig = configoptional.None[exporterhelper.QueueBatchConfig]() cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLS: configtls.ClientConfig{ @@ -778,7 +780,7 @@ func TestSendLogData(t *testing.T) { cfg := factory.CreateDefaultConfig().(*Config) // Disable queuing to ensure that we execute the request when calling ConsumeLogs // otherwise we will not see any errors. - cfg.QueueConfig.Enabled = false + cfg.QueueConfig = configoptional.None[exporterhelper.QueueBatchConfig]() cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLS: configtls.ClientConfig{ @@ -882,7 +884,7 @@ func TestSendProfiles(t *testing.T) { cfg := factory.CreateDefaultConfig().(*Config) // Disable queuing to ensure that we execute the request when calling ConsumeProfiles // otherwise we will not see any errors. - cfg.QueueConfig.Enabled = false + cfg.QueueConfig = configoptional.None[exporterhelper.QueueBatchConfig]() cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), TLS: configtls.ClientConfig{ diff --git a/exporter/otlphttpexporter/config.go b/exporter/otlphttpexporter/config.go index 0417fe1dd75..8ce72fa28b4 100644 --- a/exporter/otlphttpexporter/config.go +++ b/exporter/otlphttpexporter/config.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper" ) @@ -45,9 +46,9 @@ func (e *EncodingType) UnmarshalText(text []byte) error { // Config defines configuration for OTLP/HTTP exporter. type Config struct { - ClientConfig confighttp.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. - QueueConfig exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"` - RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"` + ClientConfig confighttp.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + QueueConfig configoptional.Optional[exporterhelper.QueueBatchConfig] `mapstructure:"sending_queue"` + RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"` // The URL to send traces to. If omitted the Endpoint + "/v1/traces" will be used. TracesEndpoint string `mapstructure:"traces_endpoint"` diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index 1957053163a..81fb4252e79 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -53,8 +53,7 @@ func TestUnmarshalConfig(t *testing.T) { MaxInterval: 1 * time.Minute, MaxElapsedTime: 10 * time.Minute, }, - QueueConfig: exporterhelper.QueueBatchConfig{ - Enabled: true, + QueueConfig: configoptional.Some(exporterhelper.QueueBatchConfig{ Sizer: exporterhelper.RequestSizerTypeRequests, NumConsumers: 2, QueueSize: 10, @@ -63,7 +62,7 @@ func TestUnmarshalConfig(t *testing.T) { FlushTimeout: 200 * time.Millisecond, MinSize: 8192, }), - }, + }), Encoding: EncodingProto, ClientConfig: confighttp.ClientConfig{ Headers: configopaque.MapList{ diff --git a/exporter/otlphttpexporter/factory_test.go b/exporter/otlphttpexporter/factory_test.go index 3e4b1e690b6..90364a050bb 100644 --- a/exporter/otlphttpexporter/factory_test.go +++ b/exporter/otlphttpexporter/factory_test.go @@ -35,7 +35,7 @@ func TestCreateDefaultConfig(t *testing.T) { assert.Equal(t, 300*time.Second, ocfg.RetryConfig.MaxElapsedTime, "default retry MaxElapsedTime") assert.Equal(t, 5*time.Second, ocfg.RetryConfig.InitialInterval, "default retry InitialInterval") assert.Equal(t, 30*time.Second, ocfg.RetryConfig.MaxInterval, "default retry MaxInterval") - assert.True(t, ocfg.QueueConfig.Enabled, "default sending queue is enabled") + assert.True(t, ocfg.QueueConfig.HasValue(), "default sending queue is enabled") assert.Equal(t, EncodingProto, ocfg.Encoding) assert.Equal(t, configcompression.TypeGzip, ocfg.ClientConfig.Compression) } diff --git a/internal/e2e/consume_contract_test.go b/internal/e2e/consume_contract_test.go index 5194b9e772b..c02ea5b5f69 100644 --- a/internal/e2e/consume_contract_test.go +++ b/internal/e2e/consume_contract_test.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -23,7 +24,7 @@ func testExporterConfig(endpoint string) component.Config { retryConfig := configretry.NewDefaultBackOffConfig() retryConfig.InitialInterval = time.Millisecond // interval is short for the test purposes return &otlpexporter.Config{ - QueueConfig: exporterhelper.QueueBatchConfig{Enabled: false}, + QueueConfig: configoptional.None[exporterhelper.QueueBatchConfig](), RetryConfig: retryConfig, ClientConfig: configgrpc.ClientConfig{ Endpoint: endpoint, diff --git a/internal/e2e/error_propagation_test.go b/internal/e2e/error_propagation_test.go index bb2180416d2..0991334a43d 100644 --- a/internal/e2e/error_propagation_test.go +++ b/internal/e2e/error_propagation_test.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/exporter/otlphttpexporter" @@ -169,7 +170,7 @@ func createGRPCExporter(t *testing.T, s *status.Status) consumer.Logs { f := otlpexporter.NewFactory() cfg := f.CreateDefaultConfig().(*otlpexporter.Config) - cfg.QueueConfig.Enabled = false + cfg.QueueConfig = configoptional.None[exporterhelper.QueueBatchConfig]() cfg.RetryConfig.Enabled = false cfg.ClientConfig = configgrpc.ClientConfig{ Endpoint: ln.Addr().String(), @@ -203,7 +204,7 @@ func createHTTPExporter(t *testing.T, code int) consumer.Logs { f := otlphttpexporter.NewFactory() cfg := f.CreateDefaultConfig().(*otlphttpexporter.Config) - cfg.QueueConfig.Enabled = false + cfg.QueueConfig = configoptional.None[exporterhelper.QueueBatchConfig]() cfg.RetryConfig.Enabled = false cfg.Encoding = otlphttpexporter.EncodingProto cfg.LogsEndpoint = srv.URL + "/v1/logs" diff --git a/internal/e2e/otlphttp_test.go b/internal/e2e/otlphttp_test.go index 0f7222e7b69..27251bbdc17 100644 --- a/internal/e2e/otlphttp_test.go +++ b/internal/e2e/otlphttp_test.go @@ -29,9 +29,11 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/otlphttpexporter" "go.opentelemetry.io/collector/internal/testutil" @@ -389,7 +391,7 @@ func startLogs(t *testing.T, baseURL, overrideURL string) exporter.Logs { func createConfig(baseURL string, defaultCfg component.Config) *otlphttpexporter.Config { cfg := defaultCfg.(*otlphttpexporter.Config) cfg.ClientConfig.Endpoint = baseURL - cfg.QueueConfig.Enabled = false + cfg.QueueConfig = configoptional.None[exporterhelper.QueueBatchConfig]() cfg.RetryConfig.Enabled = false return cfg } From 24bcec82df5486784df2b934f558835a32d98040 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Thu, 13 Nov 2025 17:58:06 +0100 Subject: [PATCH 2/7] fix example --- exporter/example_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/exporter/example_test.go b/exporter/example_test.go index 9a21c405609..a605423cd1b 100644 --- a/exporter/example_test.go +++ b/exporter/example_test.go @@ -10,6 +10,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" @@ -22,7 +23,7 @@ var typeStr = component.MustNewType("example") // exampleConfig holds configuration settings for the exporter. type exampleConfig struct { - QueueSettings exporterhelper.QueueBatchConfig + QueueSettings configoptional.Optional[exporterhelper.QueueBatchConfig] BackOffConfig configretry.BackOffConfig } From 7efc984dcccb25f7d9978db192a4cf9b45d7a19a Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Thu, 13 Nov 2025 18:07:40 +0100 Subject: [PATCH 3/7] fix CI --- ...l-for-exporterhelper-queuebatchconfig.yaml | 26 +++++++++++++++++++ exporter/go.mod | 2 +- 2 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 .chloggen/mx-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml diff --git a/.chloggen/mx-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml b/.chloggen/mx-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml new file mode 100644 index 00000000000..64ca0a730cc --- /dev/null +++ b/.chloggen/mx-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml @@ -0,0 +1,26 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: pkg/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Use `configoptional.Optional` for the `exporterhelper.QueueBatchConfig` + +# One or more tracking issues or pull requests related to the change +issues: [14155] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + You may need to change the field type of your component configuration to be `configoptional.Optional[exporterhelper.QueueBatchConfig]` + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/go.mod b/exporter/go.mod index 398fcd8bd8e..ea2617654b7 100644 --- a/exporter/go.mod +++ b/exporter/go.mod @@ -5,6 +5,7 @@ go 1.24.0 require ( github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.45.0 + go.opentelemetry.io/collector/config/configoptional v1.45.0 go.opentelemetry.io/collector/config/configretry v1.45.0 go.opentelemetry.io/collector/consumer v1.45.0 go.opentelemetry.io/collector/consumer/consumertest v0.139.0 @@ -31,7 +32,6 @@ require ( github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/collector/client v1.45.0 // indirect - go.opentelemetry.io/collector/config/configoptional v1.45.0 // indirect go.opentelemetry.io/collector/confmap v1.45.0 // indirect go.opentelemetry.io/collector/confmap/xconfmap v0.139.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.139.0 // indirect From d7a496bd1b380c0ef9d785ffcf746f2336662bea Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Tue, 18 Nov 2025 13:47:15 +0100 Subject: [PATCH 4/7] Apply fix from Jade --- exporter/debugexporter/config_test.go | 3 ++- exporter/debugexporter/factory.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/exporter/debugexporter/config_test.go b/exporter/debugexporter/config_test.go index fa06517138d..9dbfed17348 100644 --- a/exporter/debugexporter/config_test.go +++ b/exporter/debugexporter/config_test.go @@ -25,6 +25,7 @@ func TestUnmarshalDefaultConfig(t *testing.T) { } func TestUnmarshalConfig(t *testing.T) { + queueCfg := exporterhelper.NewDefaultQueueConfig() tests := []struct { filename string cfg *Config @@ -36,7 +37,7 @@ func TestUnmarshalConfig(t *testing.T) { Verbosity: configtelemetry.LevelDetailed, SamplingInitial: 10, SamplingThereafter: 50, - QueueConfig: configoptional.None[exporterhelper.QueueBatchConfig](), + QueueConfig: configoptional.Default(*queueCfg.Get()), }, }, { diff --git a/exporter/debugexporter/factory.go b/exporter/debugexporter/factory.go index fe5e70fd56d..a2f4c5995a9 100644 --- a/exporter/debugexporter/factory.go +++ b/exporter/debugexporter/factory.go @@ -43,12 +43,13 @@ func NewFactory() exporter.Factory { } func createDefaultConfig() component.Config { + queueCfg := exporterhelper.NewDefaultQueueConfig() return &Config{ Verbosity: configtelemetry.LevelBasic, SamplingInitial: defaultSamplingInitial, SamplingThereafter: defaultSamplingThereafter, UseInternalLogger: true, - QueueConfig: configoptional.None[exporterhelper.QueueBatchConfig](), + QueueConfig: configoptional.Default(*queueCfg.Get()), } } From b5a6743f6945aff22d9de884c47f5c718f4c7c97 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Wed, 19 Nov 2025 12:22:40 +0100 Subject: [PATCH 5/7] Update .chloggen/mx-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- ...-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/mx-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml b/.chloggen/mx-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml index 64ca0a730cc..aa8c53653ce 100644 --- a/.chloggen/mx-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml +++ b/.chloggen/mx-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml @@ -16,7 +16,7 @@ issues: [14155] # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. subtext: | - You may need to change the field type of your component configuration to be `configoptional.Optional[exporterhelper.QueueBatchConfig]` + It's recommended to change the field type in your component configuration to be `configoptional.Optional[exporterhelper.QueueBatchConfig]` to keep the `enabled` subfield. # Optional: The change log or logs in which this entry should be included. # e.g. '[user]' or '[user, api]' From faac079c71c1d0ab23e98003100ea0273357ae54 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Mon, 24 Nov 2025 10:31:14 -0800 Subject: [PATCH 6/7] Unwrap NewDefaultQueueConfig() --- exporter/debugexporter/config_test.go | 2 +- exporter/debugexporter/exporter_test.go | 5 +++-- exporter/debugexporter/factory.go | 3 +-- .../internal/base_exporter_test.go | 10 +++++----- .../exporterhelper/internal/queue_sender.go | 20 ++++++++++--------- .../internal/queue_sender_test.go | 10 +++++----- exporter/exporterhelper/logs_test.go | 3 ++- exporter/exporterhelper/metrics_test.go | 5 +++-- exporter/exporterhelper/traces_test.go | 5 +++-- .../xexporterhelper/profiles_test.go | 5 +++-- exporter/otlpexporter/factory.go | 2 +- exporter/otlpexporter/factory_test.go | 2 +- exporter/otlphttpexporter/factory.go | 3 ++- 13 files changed, 41 insertions(+), 34 deletions(-) diff --git a/exporter/debugexporter/config_test.go b/exporter/debugexporter/config_test.go index 9dbfed17348..ffd98e14832 100644 --- a/exporter/debugexporter/config_test.go +++ b/exporter/debugexporter/config_test.go @@ -37,7 +37,7 @@ func TestUnmarshalConfig(t *testing.T) { Verbosity: configtelemetry.LevelDetailed, SamplingInitial: 10, SamplingThereafter: 50, - QueueConfig: configoptional.Default(*queueCfg.Get()), + QueueConfig: configoptional.Default(queueCfg), }, }, { diff --git a/exporter/debugexporter/exporter_test.go b/exporter/debugexporter/exporter_test.go index f95e16891a4..61372c24124 100644 --- a/exporter/debugexporter/exporter_test.go +++ b/exporter/debugexporter/exporter_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/exporter/debugexporter/internal/metadata" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -112,7 +113,7 @@ func createTestCases() []testCase { name: "default config", config: func() *Config { c := createDefaultConfig().(*Config) - c.QueueConfig = exporterhelper.NewDefaultQueueConfig() + c.QueueConfig = configoptional.Some(exporterhelper.NewDefaultQueueConfig()) c.QueueConfig.Get().QueueSize = 10 return c }(), @@ -121,7 +122,7 @@ func createTestCases() []testCase { name: "don't use internal logger", config: func() *Config { cfg := createDefaultConfig().(*Config) - cfg.QueueConfig = exporterhelper.NewDefaultQueueConfig() + cfg.QueueConfig = configoptional.Some(exporterhelper.NewDefaultQueueConfig()) cfg.QueueConfig.Get().QueueSize = 10 cfg.UseInternalLogger = false return cfg diff --git a/exporter/debugexporter/factory.go b/exporter/debugexporter/factory.go index a2f4c5995a9..0e5fa79a1be 100644 --- a/exporter/debugexporter/factory.go +++ b/exporter/debugexporter/factory.go @@ -43,13 +43,12 @@ func NewFactory() exporter.Factory { } func createDefaultConfig() component.Config { - queueCfg := exporterhelper.NewDefaultQueueConfig() return &Config{ Verbosity: configtelemetry.LevelBasic, SamplingInitial: defaultSamplingInitial, SamplingThereafter: defaultSamplingThereafter, UseInternalLogger: true, - QueueConfig: configoptional.Default(*queueCfg.Get()), + QueueConfig: configoptional.Default(exporterhelper.NewDefaultQueueConfig()), } } diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index 28b42e834e8..70aa71ba700 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -50,16 +50,16 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) { require.NoError(t, err) require.Nil(t, bs.queueBatchSettings.Encoding) _, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport, - WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig())) + WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(configoptional.Some(NewDefaultQueueConfig()))) require.Error(t, err) qCfg := NewDefaultQueueConfig() storageID := component.NewID(component.MustNewType("test")) - qCfg.Get().StorageID = &storageID + qCfg.StorageID = &storageID _, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport, WithQueueBatchSettings(newFakeQueueBatch()), WithRetry(configretry.NewDefaultBackOffConfig()), - WithQueueBatch(qCfg, queuebatch.Settings[request.Request]{})) + WithQueueBatch(configoptional.Some(qCfg), queuebatch.Settings[request.Request]{})) require.Error(t, err) } @@ -70,10 +70,10 @@ func TestBaseExporterLogging(t *testing.T) { rCfg := configretry.NewDefaultBackOffConfig() rCfg.Enabled = false qCfg := NewDefaultQueueConfig() - qCfg.Get().WaitForResult = true + qCfg.WaitForResult = true bs, err := NewBaseExporter(set, pipeline.SignalMetrics, errExport, WithQueueBatchSettings(newFakeQueueBatch()), - WithQueue(qCfg), + WithQueue(configoptional.Some(qCfg)), WithRetry(rCfg)) require.NoError(t, err) require.NoError(t, bs.Start(context.Background(), componenttest.NewNopHost())) diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index d2c04453d54..804f34a7233 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -16,14 +16,16 @@ import ( ) // NewDefaultQueueConfig returns the default config for queuebatch.Config. -// By default, the queue stores 1000 requests of telemetry and is non-blocking when full. -func NewDefaultQueueConfig() configoptional.Optional[queuebatch.Config] { - return configoptional.Some(queuebatch.Config{ - Sizer: request.SizerTypeRequests, - NumConsumers: 10, - // By default, batches are 8192 spans, for a total of up to 8 million spans in the queue - // This can be estimated at 1-4 GB worth of maximum memory usage - // This default is probably still too high, and may be adjusted further down in a future release +// By default: +// +// - the queue stores 1000 requests of telemetry +// - is non-blocking when full +// - concurrent exports limited to 10 +// - emits batches of 8192 items, timeout 200ms +func NewDefaultQueueConfig() queuebatch.Config { + return queuebatch.Config{ + Sizer: request.SizerTypeRequests, + NumConsumers: 10, QueueSize: 1_000, BlockOnOverflow: false, Batch: configoptional.Default(queuebatch.BatchConfig{ @@ -31,7 +33,7 @@ func NewDefaultQueueConfig() configoptional.Optional[queuebatch.Config] { Sizer: request.SizerTypeItems, MinSize: 8192, }), - }) + } } func NewQueueSender( diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index 64ffada49f5..94546f7a47a 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -34,7 +34,7 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) { qSet.Telemetry.Logger = zap.New(logger) qCfg := NewDefaultQueueConfig() be, err := NewQueueSender( - qSet, *qCfg.Get(), "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") })) + qSet, qCfg, "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") })) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -48,14 +48,14 @@ func TestQueueConfig_Validate(t *testing.T) { qCfg := NewDefaultQueueConfig() require.NoError(t, qCfg.Validate()) - qCfg.Get().NumConsumers = 0 + qCfg.NumConsumers = 0 require.EqualError(t, qCfg.Validate(), "`num_consumers` must be positive") qCfg = NewDefaultQueueConfig() - qCfg.Get().QueueSize = 0 + qCfg.QueueSize = 0 require.EqualError(t, qCfg.Validate(), "`queue_size` must be positive") // Confirm Validate doesn't return error with invalid config when feature is disabled - qCfg = configoptional.None[queuebatch.Config]() - assert.NoError(t, qCfg.Validate()) + noCfg := configoptional.None[queuebatch.Config]() + assert.NoError(t, noCfg.Validate()) } diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 6b176c79b85..8182dc36577 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" @@ -99,7 +100,7 @@ func TestLogs_Default_ReturnError(t *testing.T) { func TestLogs_WithPersistentQueue(t *testing.T) { fgOrigReadState := queue.PersistRequestContextOnRead fgOrigWriteState := queue.PersistRequestContextOnWrite - qCfg := NewDefaultQueueConfig() + qCfg := configoptional.Some(NewDefaultQueueConfig()) storageID := component.MustNewIDWithName("file_storage", "storage") qCfg.Get().StorageID = &storageID set := exportertest.NewNopSettings(exportertest.NopType) diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 2aeb99e8c61..04b3d4d5127 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" @@ -101,7 +102,7 @@ func TestMetrics_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.Get().StorageID = &storageID + qCfg.StorageID = &storageID set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_metrics", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ @@ -148,7 +149,7 @@ func TestMetrics_WithPersistentQueue(t *testing.T) { }) ms := consumertest.MetricsSink{} - te, err := NewMetrics(context.Background(), set, &fakeMetricsConfig, ms.ConsumeMetrics, WithQueue(qCfg)) + te, err := NewMetrics(context.Background(), set, &fakeMetricsConfig, ms.ConsumeMetrics, WithQueue(configoptional.Some(qCfg))) require.NoError(t, err) require.NoError(t, te.Start(context.Background(), host)) t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 0a301e562a9..df3c8af89d8 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" @@ -103,7 +104,7 @@ func TestTraces_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.Get().StorageID = &storageID + qCfg.StorageID = &storageID set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ @@ -150,7 +151,7 @@ func TestTraces_WithPersistentQueue(t *testing.T) { }) ts := consumertest.TracesSink{} - te, err := NewTraces(context.Background(), set, &fakeTracesConfig, ts.ConsumeTraces, WithQueue(qCfg)) + te, err := NewTraces(context.Background(), set, &fakeTracesConfig, ts.ConsumeTraces, WithQueue(configoptional.Some(qCfg))) require.NoError(t, err) require.NoError(t, te.Start(context.Background(), host)) t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) diff --git a/exporter/exporterhelper/xexporterhelper/profiles_test.go b/exporter/exporterhelper/xexporterhelper/profiles_test.go index b422e97e590..c296a5c82e9 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_test.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumererror/xconsumererror" @@ -171,7 +172,7 @@ func TestProfiles_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := exporterhelper.NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.Get().StorageID = &storageID + qCfg.StorageID = &storageID set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ @@ -218,7 +219,7 @@ func TestProfiles_WithPersistentQueue(t *testing.T) { }) ts := consumertest.ProfilesSink{} - te, err := NewProfiles(context.Background(), set, &fakeProfilesExporterConfig, ts.ConsumeProfiles, exporterhelper.WithQueue(qCfg)) + te, err := NewProfiles(context.Background(), set, &fakeProfilesExporterConfig, ts.ConsumeProfiles, exporterhelper.WithQueue(configoptional.Some(qCfg))) require.NoError(t, err) require.NoError(t, te.Start(context.Background(), host)) t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) diff --git a/exporter/otlpexporter/factory.go b/exporter/otlpexporter/factory.go index 3d5cc851214..75cf6fbaeae 100644 --- a/exporter/otlpexporter/factory.go +++ b/exporter/otlpexporter/factory.go @@ -44,7 +44,7 @@ func createDefaultConfig() component.Config { return &Config{ TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(), RetryConfig: configretry.NewDefaultBackOffConfig(), - QueueConfig: exporterhelper.NewDefaultQueueConfig(), + QueueConfig: configoptional.Some(exporterhelper.NewDefaultQueueConfig()), ClientConfig: clientCfg, } } diff --git a/exporter/otlpexporter/factory_test.go b/exporter/otlpexporter/factory_test.go index 0a8e82d22fd..4ec179c43fd 100644 --- a/exporter/otlpexporter/factory_test.go +++ b/exporter/otlpexporter/factory_test.go @@ -33,7 +33,7 @@ func TestCreateDefaultConfig(t *testing.T) { ocfg, ok := factory.CreateDefaultConfig().(*Config) assert.True(t, ok) assert.Equal(t, configretry.NewDefaultBackOffConfig(), ocfg.RetryConfig) - assert.Equal(t, exporterhelper.NewDefaultQueueConfig(), ocfg.QueueConfig) + assert.Equal(t, configoptional.Some(exporterhelper.NewDefaultQueueConfig()), ocfg.QueueConfig) assert.Equal(t, exporterhelper.NewDefaultTimeoutConfig(), ocfg.TimeoutConfig) assert.Equal(t, configcompression.TypeGzip, ocfg.ClientConfig.Compression) } diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index 3db1c4d0f56..b33485b568f 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" @@ -44,7 +45,7 @@ func createDefaultConfig() component.Config { return &Config{ RetryConfig: configretry.NewDefaultBackOffConfig(), - QueueConfig: exporterhelper.NewDefaultQueueConfig(), + QueueConfig: configoptional.Some(exporterhelper.NewDefaultQueueConfig()), Encoding: EncodingProto, ClientConfig: clientConfig, } From bef7f112a2280c5a321242f4e4079929f74a7d2d Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Mon, 24 Nov 2025 10:40:14 -0800 Subject: [PATCH 7/7] chlog --- ...-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/mx-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml b/.chloggen/mx-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml index aa8c53653ce..6b91889360b 100644 --- a/.chloggen/mx-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml +++ b/.chloggen/mx-psi_configoptional-for-exporterhelper-queuebatchconfig.yaml @@ -16,7 +16,7 @@ issues: [14155] # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. subtext: | - It's recommended to change the field type in your component configuration to be `configoptional.Optional[exporterhelper.QueueBatchConfig]` to keep the `enabled` subfield. + It's recommended to change the field type in your component configuration to be `configoptional.Optional[exporterhelper.QueueBatchConfig]` to keep the `enabled` subfield. Use configoptional.Some(exporterhelper.NewDefaultQueueConfig()) to enable by default. Use configoptional.Default(exporterhelper.NewDefaultQueueConfig()) to disable by default. # Optional: The change log or logs in which this entry should be included. # e.g. '[user]' or '[user, api]'