@@ -13,8 +13,6 @@ import (
1313
1414 "go.uber.org/zap"
1515
16- "go.opentelemetry.io/collector/component"
17- "go.opentelemetry.io/collector/confmap"
1816 "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1917 "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2018 "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
@@ -26,10 +24,10 @@ type QueueBatchSettings[K any] struct {
2624 Sizers map [request.SizerType ]request.Sizer [K ]
2725}
2826
29- // NewDefaultQueueConfig returns the default config for QueueConfig .
27+ // NewDefaultQueueConfig returns the default config for queuebatch.Config .
3028// By default, the queue stores 1000 items of telemetry and is non-blocking when full.
31- func NewDefaultQueueConfig () QueueConfig {
32- return QueueConfig {
29+ func NewDefaultQueueConfig () queuebatch. Config {
30+ return queuebatch. Config {
3331 Enabled : true ,
3432 Sizer : request .SizerTypeRequests ,
3533 NumConsumers : 10 ,
@@ -38,94 +36,18 @@ func NewDefaultQueueConfig() QueueConfig {
3836 // This default is probably still too high, and may be adjusted further down in a future release
3937 QueueSize : 1_000 ,
4038 BlockOnOverflow : false ,
39+ StorageID : nil ,
40+ Batch : nil ,
4141 }
4242}
4343
44- // QueueConfig defines configuration for queueing requests before exporting.
45- // It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
46- // Experimental: This API is at the early stage of development and may change without backward compatibility
47- // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
48- type QueueConfig struct {
49- // Enabled indicates whether to not enqueue batches before exporting.
50- Enabled bool `mapstructure:"enabled"`
51-
52- // WaitForResult determines if incoming requests are blocked until the request is processed or not.
53- // Currently, this option is not available when persistent queue is configured using the storage configuration.
54- WaitForResult bool `mapstructure:"wait_for_result"`
55-
56- // Sizer determines the type of size measurement used by this component.
57- // It accepts "requests", "items", or "bytes".
58- Sizer request.SizerType `mapstructure:"sizer"`
59-
60- // QueueSize represents the maximum data size allowed for concurrent storage and processing.
61- QueueSize int64 `mapstructure:"queue_size"`
62-
63- // NumConsumers is the number of consumers from the queue.
64- NumConsumers int `mapstructure:"num_consumers"`
65-
66- // Deprecated: [v0.123.0] use `block_on_overflow`.
67- Blocking bool `mapstructure:"blocking"`
68-
69- // BlockOnOverflow determines the behavior when the component's QueueSize limit is reached.
70- // If true, the component will wait for space; otherwise, operations will immediately return a retryable error.
71- BlockOnOverflow bool `mapstructure:"block_on_overflow"`
72-
73- // StorageID if not empty, enables the persistent storage and uses the component specified
74- // as a storage extension for the persistent queue
75- StorageID * component.ID `mapstructure:"storage"`
76-
77- hasBlocking bool
78- }
79-
80- func (qCfg * QueueConfig ) Unmarshal (conf * confmap.Conf ) error {
81- if err := conf .Unmarshal (qCfg ); err != nil {
82- return err
83- }
84-
85- // If user still uses the old blocking, override and will log error during initialization.
86- if conf .IsSet ("blocking" ) {
87- qCfg .hasBlocking = true
88- qCfg .BlockOnOverflow = qCfg .Blocking
89- }
90-
91- return nil
92- }
93-
94- // Validate checks if the Config is valid
95- func (qCfg * QueueConfig ) Validate () error {
96- if ! qCfg .Enabled {
97- return nil
98- }
99-
100- if qCfg .NumConsumers <= 0 {
101- return errors .New ("`num_consumers` must be positive" )
102- }
103-
104- if qCfg .QueueSize <= 0 {
105- return errors .New ("`queue_size` must be positive" )
106- }
107-
108- if qCfg .StorageID != nil && qCfg .WaitForResult {
109- return errors .New ("`wait_for_result` is not supported with a persistent queue configured with `storage`" )
110- }
111-
112- // Only support request sizer for persistent queue at this moment.
113- if qCfg .StorageID != nil && qCfg .Sizer != request .SizerTypeRequests {
114- return errors .New ("persistent queue configured with `storage` only supports `requests` sizer" )
115- }
116- return nil
117- }
118-
11944func NewQueueSender (
12045 qSet queuebatch.Settings [request.Request ],
121- qCfg QueueConfig ,
46+ qCfg queuebatch. Config ,
12247 bCfg BatcherConfig ,
12348 exportFailureMessage string ,
12449 next sender.Sender [request.Request ],
12550) (sender.Sender [request.Request ], error ) {
126- if qCfg .hasBlocking {
127- qSet .Telemetry .Logger .Error ("using deprecated field `blocking`" )
128- }
12951 exportFunc := func (ctx context.Context , req request.Request ) error {
13052 // Have to read the number of items before sending the request since the request can
13153 // be modified by the downstream components like the batcher.
@@ -141,47 +63,41 @@ func NewQueueSender(
14163 return queuebatch .NewQueueBatch (qSet , newQueueBatchConfig (qCfg , bCfg ), exportFunc )
14264}
14365
144- func newQueueBatchConfig (qCfg QueueConfig , bCfg BatcherConfig ) queuebatch.Config {
145- var qbCfg queuebatch.Config
66+ func newQueueBatchConfig (qCfg queuebatch.Config , bCfg BatcherConfig ) queuebatch.Config {
67+ // Overwrite configuration with the legacy BatcherConfig configured via WithBatcher.
68+ // TODO: Remove this when WithBatcher is removed.
69+ if ! bCfg .Enabled {
70+ return qCfg
71+ }
72+
14673 // User configured queueing, copy all config.
14774 if qCfg .Enabled {
148- qbCfg = queuebatch.Config {
149- Enabled : true ,
150- WaitForResult : qCfg .WaitForResult ,
151- Sizer : qCfg .Sizer ,
152- QueueSize : qCfg .QueueSize ,
153- NumConsumers : qCfg .NumConsumers ,
154- BlockOnOverflow : qCfg .BlockOnOverflow ,
155- StorageID : qCfg .StorageID ,
156- // TODO: Copy batching configuration as well when available.
157- }
75+ // Overwrite configuration with the legacy BatcherConfig configured via WithBatcher.
15876 // TODO: Remove this when WithBatcher is removed.
159- if bCfg .Enabled {
160- qbCfg .Batch = & queuebatch.BatchConfig {
161- FlushTimeout : bCfg .FlushTimeout ,
162- MinSize : bCfg .MinSize ,
163- MaxSize : bCfg .MaxSize ,
164- }
165- }
166- } else {
167- // This can happen only if the deprecated way to configure batching is used with a "disabled" queue.
168- // TODO: Remove this when WithBatcher is removed.
169- qbCfg = queuebatch.Config {
170- Enabled : true ,
171- WaitForResult : true ,
172- Sizer : request .SizerTypeRequests ,
173- QueueSize : math .MaxInt ,
174- NumConsumers : runtime .NumCPU (),
175- BlockOnOverflow : true ,
176- StorageID : nil ,
177- Batch : & queuebatch.BatchConfig {
178- FlushTimeout : bCfg .FlushTimeout ,
179- MinSize : bCfg .MinSize ,
180- MaxSize : bCfg .MaxSize ,
181- },
77+ qCfg .Batch = & queuebatch.BatchConfig {
78+ FlushTimeout : bCfg .FlushTimeout ,
79+ MinSize : bCfg .MinSize ,
80+ MaxSize : bCfg .MaxSize ,
18281 }
82+ return qCfg
83+ }
84+
85+ // This can happen only if the deprecated way to configure batching is used with a "disabled" queue.
86+ // TODO: Remove this when WithBatcher is removed.
87+ return queuebatch.Config {
88+ Enabled : true ,
89+ WaitForResult : true ,
90+ Sizer : request .SizerTypeRequests ,
91+ QueueSize : math .MaxInt ,
92+ NumConsumers : runtime .NumCPU (),
93+ BlockOnOverflow : true ,
94+ StorageID : nil ,
95+ Batch : & queuebatch.BatchConfig {
96+ FlushTimeout : bCfg .FlushTimeout ,
97+ MinSize : bCfg .MinSize ,
98+ MaxSize : bCfg .MaxSize ,
99+ },
183100 }
184- return qbCfg
185101}
186102
187103// BatcherConfig defines a configuration for batching requests based on a timeout and a minimum number of items.
0 commit comments