Skip to content

Commit d7e2671

Browse files
committed
[exporterhelper] Use configoptional.Optional for exporterhelper QueueBatchConfig
1 parent 3d9d519 commit d7e2671

28 files changed

+91
-98
lines changed

exporter/debugexporter/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88

99
"go.opentelemetry.io/collector/component"
10+
"go.opentelemetry.io/collector/config/configoptional"
1011
"go.opentelemetry.io/collector/config/configtelemetry"
1112
"go.opentelemetry.io/collector/exporter/exporterhelper"
1213
)
@@ -33,7 +34,7 @@ type Config struct {
3334
// UseInternalLogger defines whether the exporter sends the output to the collector's internal logger.
3435
UseInternalLogger bool `mapstructure:"use_internal_logger"`
3536

36-
QueueConfig exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"`
37+
QueueConfig configoptional.Optional[exporterhelper.QueueBatchConfig] `mapstructure:"sending_queue"`
3738

3839
// prevent unkeyed literal initialization
3940
_ struct{}

exporter/debugexporter/config_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
1212

13+
"go.opentelemetry.io/collector/config/configoptional"
1314
"go.opentelemetry.io/collector/config/configtelemetry"
1415
"go.opentelemetry.io/collector/confmap"
1516
"go.opentelemetry.io/collector/confmap/confmaptest"
@@ -24,8 +25,6 @@ func TestUnmarshalDefaultConfig(t *testing.T) {
2425
}
2526

2627
func TestUnmarshalConfig(t *testing.T) {
27-
queueCfg := exporterhelper.NewDefaultQueueConfig()
28-
queueCfg.Enabled = false
2928
tests := []struct {
3029
filename string
3130
cfg *Config
@@ -37,7 +36,7 @@ func TestUnmarshalConfig(t *testing.T) {
3736
Verbosity: configtelemetry.LevelDetailed,
3837
SamplingInitial: 10,
3938
SamplingThereafter: 50,
40-
QueueConfig: queueCfg,
39+
QueueConfig: configoptional.None[exporterhelper.QueueBatchConfig](),
4140
},
4241
},
4342
{

exporter/debugexporter/exporter_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"go.opentelemetry.io/collector/config/configtelemetry"
1616
"go.opentelemetry.io/collector/exporter/debugexporter/internal/metadata"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper"
1718
"go.opentelemetry.io/collector/exporter/exportertest"
1819
"go.opentelemetry.io/collector/pdata/plog"
1920
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -111,15 +112,17 @@ func createTestCases() []testCase {
111112
name: "default config",
112113
config: func() *Config {
113114
c := createDefaultConfig().(*Config)
114-
c.QueueConfig.QueueSize = 10
115+
c.QueueConfig = exporterhelper.NewDefaultQueueConfig()
116+
c.QueueConfig.Get().QueueSize = 10
115117
return c
116118
}(),
117119
},
118120
{
119121
name: "don't use internal logger",
120122
config: func() *Config {
121123
cfg := createDefaultConfig().(*Config)
122-
cfg.QueueConfig.QueueSize = 10
124+
cfg.QueueConfig = exporterhelper.NewDefaultQueueConfig()
125+
cfg.QueueConfig.Get().QueueSize = 10
123126
cfg.UseInternalLogger = false
124127
return cfg
125128
}(),

exporter/debugexporter/factory.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"go.uber.org/zap/zapcore"
1212

1313
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/config/configoptional"
1415
"go.opentelemetry.io/collector/config/configtelemetry"
1516
"go.opentelemetry.io/collector/consumer"
1617
"go.opentelemetry.io/collector/exporter"
@@ -42,15 +43,12 @@ func NewFactory() exporter.Factory {
4243
}
4344

4445
func createDefaultConfig() component.Config {
45-
queueCfg := exporterhelper.NewDefaultQueueConfig()
46-
queueCfg.Enabled = false
47-
4846
return &Config{
4947
Verbosity: configtelemetry.LevelBasic,
5048
SamplingInitial: defaultSamplingInitial,
5149
SamplingThereafter: defaultSamplingThereafter,
5250
UseInternalLogger: true,
53-
QueueConfig: queueCfg,
51+
QueueConfig: configoptional.None[exporterhelper.QueueBatchConfig](),
5452
}
5553
}
5654

exporter/debugexporter/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/stretchr/testify v1.11.1
77
go.opentelemetry.io/collector/component v1.45.0
88
go.opentelemetry.io/collector/component/componenttest v0.139.0
9+
go.opentelemetry.io/collector/config/configoptional v1.45.0
910
go.opentelemetry.io/collector/config/configtelemetry v0.139.0
1011
go.opentelemetry.io/collector/confmap v1.45.0
1112
go.opentelemetry.io/collector/consumer v1.45.0
@@ -43,7 +44,6 @@ require (
4344
github.com/pmezard/go-difflib v1.0.0 // indirect
4445
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
4546
go.opentelemetry.io/collector/client v1.45.0 // indirect
46-
go.opentelemetry.io/collector/config/configoptional v1.45.0 // indirect
4747
go.opentelemetry.io/collector/config/configretry v1.45.0 // indirect
4848
go.opentelemetry.io/collector/confmap/xconfmap v0.139.0 // indirect
4949
go.opentelemetry.io/collector/consumer/consumererror v0.139.0 // indirect

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"go.uber.org/zap"
1212

1313
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/config/configoptional"
1415
"go.opentelemetry.io/collector/config/configretry"
1516
"go.opentelemetry.io/collector/consumer"
1617
"go.opentelemetry.io/collector/exporter"
@@ -47,7 +48,7 @@ type BaseExporter struct {
4748
retryCfg configretry.BackOffConfig
4849

4950
queueBatchSettings queuebatch.Settings[request.Request]
50-
queueCfg queuebatch.Config
51+
queueCfg configoptional.Optional[queuebatch.Config]
5152
}
5253

5354
func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sender.SendFunc[request.Request], options ...Option) (*BaseExporter, error) {
@@ -82,19 +83,19 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
8283
return nil, err
8384
}
8485

85-
if be.queueCfg.Batch.HasValue() {
86+
if be.queueCfg.HasValue() && be.queueCfg.Get().Batch.HasValue() {
8687
// Batcher mutates the data.
8788
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
8889
}
8990

90-
if be.queueCfg.Enabled {
91+
if be.queueCfg.HasValue() {
9192
qSet := queuebatch.AllSettings[request.Request]{
9293
Settings: be.queueBatchSettings,
9394
Signal: signal,
9495
ID: set.ID,
9596
Telemetry: set.TelemetrySettings,
9697
}
97-
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.ExportFailureMessage, be.firstSender)
98+
be.QueueSender, err = NewQueueSender(qSet, *be.queueCfg.Get(), be.ExportFailureMessage, be.firstSender)
9899
if err != nil {
99100
return nil, err
100101
}
@@ -191,7 +192,7 @@ func WithRetry(config configretry.BackOffConfig) Option {
191192
// WithQueue overrides the default queuebatch.Config for an exporter.
192193
// The default queuebatch.Config is to disable queueing.
193194
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
194-
func WithQueue(cfg queuebatch.Config) Option {
195+
func WithQueue(cfg configoptional.Optional[queuebatch.Config]) Option {
195196
return func(o *BaseExporter) error {
196197
if o.queueBatchSettings.Encoding == nil {
197198
return errors.New("WithQueue option is not available for the new request exporters, use WithQueueBatch instead")
@@ -204,13 +205,13 @@ func WithQueue(cfg queuebatch.Config) Option {
204205
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
205206
// Experimental: This API is at the early stage of development and may change without backward compatibility
206207
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
207-
func WithQueueBatch(cfg queuebatch.Config, set queuebatch.Settings[request.Request]) Option {
208+
func WithQueueBatch(cfg configoptional.Optional[queuebatch.Config], set queuebatch.Settings[request.Request]) Option {
208209
return func(o *BaseExporter) error {
209-
if !cfg.Enabled {
210+
if !cfg.HasValue() {
210211
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
211212
return nil
212213
}
213-
if cfg.StorageID != nil && set.Encoding == nil {
214+
if cfg.Get().StorageID != nil && set.Encoding == nil {
214215
return errors.New("`Settings.Encoding` must not be nil when persistent queue is enabled")
215216
}
216217
o.queueBatchSettings = set

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"go.opentelemetry.io/collector/component"
1717
"go.opentelemetry.io/collector/component/componenttest"
18+
"go.opentelemetry.io/collector/config/configoptional"
1819
"go.opentelemetry.io/collector/config/configretry"
1920
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
2021
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -54,7 +55,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
5455

5556
qCfg := NewDefaultQueueConfig()
5657
storageID := component.NewID(component.MustNewType("test"))
57-
qCfg.StorageID = &storageID
58+
qCfg.Get().StorageID = &storageID
5859
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
5960
WithQueueBatchSettings(newFakeQueueBatch()),
6061
WithRetry(configretry.NewDefaultBackOffConfig()),
@@ -69,7 +70,7 @@ func TestBaseExporterLogging(t *testing.T) {
6970
rCfg := configretry.NewDefaultBackOffConfig()
7071
rCfg.Enabled = false
7172
qCfg := NewDefaultQueueConfig()
72-
qCfg.WaitForResult = true
73+
qCfg.Get().WaitForResult = true
7374
bs, err := NewBaseExporter(set, pipeline.SignalMetrics, errExport,
7475
WithQueueBatchSettings(newFakeQueueBatch()),
7576
WithQueue(qCfg),
@@ -98,19 +99,15 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
9899
queueOptions: []Option{
99100
WithQueueBatchSettings(newFakeQueueBatch()),
100101
func() Option {
101-
qs := NewDefaultQueueConfig()
102-
qs.Enabled = false
103-
return WithQueue(qs)
102+
return WithQueue(configoptional.None[queuebatch.Config]())
104103
}(),
105104
},
106105
},
107106
{
108107
name: "WithRequestQueue",
109108
queueOptions: []Option{
110109
func() Option {
111-
qs := NewDefaultQueueConfig()
112-
qs.Enabled = false
113-
return WithQueueBatch(qs, newFakeQueueBatch())
110+
return WithQueueBatch(configoptional.None[queuebatch.Config](), newFakeQueueBatch())
114111
}(),
115112
},
116113
},

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ import (
1717

1818
// NewDefaultQueueConfig returns the default config for queuebatch.Config.
1919
// By default, the queue stores 1000 requests of telemetry and is non-blocking when full.
20-
func NewDefaultQueueConfig() queuebatch.Config {
21-
return queuebatch.Config{
22-
Enabled: true,
20+
func NewDefaultQueueConfig() configoptional.Optional[queuebatch.Config] {
21+
return configoptional.Some(queuebatch.Config{
2322
Sizer: request.SizerTypeRequests,
2423
NumConsumers: 10,
2524
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
@@ -32,7 +31,7 @@ func NewDefaultQueueConfig() queuebatch.Config {
3231
Sizer: request.SizerTypeItems,
3332
MinSize: 8192,
3433
}),
35-
}
34+
})
3635
}
3736

3837
func NewQueueSender(

exporter/exporterhelper/internal/queue_sender_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"go.opentelemetry.io/collector/component"
1717
"go.opentelemetry.io/collector/component/componenttest"
18+
"go.opentelemetry.io/collector/config/configoptional"
1819
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1920
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2021
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
@@ -31,8 +32,9 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
3132
}
3233
logger, observed := observer.New(zap.ErrorLevel)
3334
qSet.Telemetry.Logger = zap.New(logger)
35+
qCfg := NewDefaultQueueConfig()
3436
be, err := NewQueueSender(
35-
qSet, NewDefaultQueueConfig(), "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
37+
qSet, *qCfg.Get(), "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
3638
require.NoError(t, err)
3739

3840
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
@@ -46,14 +48,14 @@ func TestQueueConfig_Validate(t *testing.T) {
4648
qCfg := NewDefaultQueueConfig()
4749
require.NoError(t, qCfg.Validate())
4850

49-
qCfg.NumConsumers = 0
51+
qCfg.Get().NumConsumers = 0
5052
require.EqualError(t, qCfg.Validate(), "`num_consumers` must be positive")
5153

5254
qCfg = NewDefaultQueueConfig()
53-
qCfg.QueueSize = 0
55+
qCfg.Get().QueueSize = 0
5456
require.EqualError(t, qCfg.Validate(), "`queue_size` must be positive")
5557

5658
// Confirm Validate doesn't return error with invalid config when feature is disabled
57-
qCfg.Enabled = false
59+
qCfg = configoptional.None[queuebatch.Config]()
5860
assert.NoError(t, qCfg.Validate())
5961
}

exporter/exporterhelper/internal/queuebatch/config.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ import (
1616

1717
// Config defines configuration for queueing and batching incoming requests.
1818
type Config struct {
19-
// Enabled indicates whether to not enqueue and batch before exporting.
20-
Enabled bool `mapstructure:"enabled"`
21-
2219
// WaitForResult determines if incoming requests are blocked until the request is processed or not.
2320
// Currently, this option is not available when persistent queue is configured using the storage configuration.
2421
WaitForResult bool `mapstructure:"wait_for_result"`
@@ -66,10 +63,6 @@ func (cfg *Config) Unmarshal(conf *confmap.Conf) error {
6663

6764
// Validate checks if the Config is valid
6865
func (cfg *Config) Validate() error {
69-
if !cfg.Enabled {
70-
return nil
71-
}
72-
7366
if cfg.NumConsumers <= 0 {
7467
return errors.New("`num_consumers` must be positive")
7568
}

0 commit comments

Comments
 (0)