Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6128da3
Remove batcher and related config in favor of sending queue
lahsivjar Sep 16, 2025
459afae
update readme
lahsivjar Sep 18, 2025
6c6b9c5
Add changelog
lahsivjar Sep 18, 2025
44aa720
fix lint
lahsivjar Sep 19, 2025
c88d9c6
Fix lint in integration tests
lahsivjar Sep 19, 2025
2b8caa5
Merge branch 'main' into remove-es-async
lahsivjar Sep 19, 2025
1465da8
Update exporter/elasticsearchexporter/README.md
lahsivjar Sep 23, 2025
5e6e752
Update readme and change batching defaults
lahsivjar Sep 23, 2025
bff9dbb
Merge branch 'main' into remove-es-async
lahsivjar Sep 23, 2025
2869467
Merge branch 'main' into remove-es-async
lahsivjar Sep 23, 2025
3bb6e62
Add message for deprecated config
lahsivjar Sep 23, 2025
fa5bd51
go mod tidy
lahsivjar Sep 23, 2025
f9a5f30
make lint
lahsivjar Sep 23, 2025
1aec3c7
Fix integration tests
lahsivjar Sep 23, 2025
41d1a4e
Merge branch 'main' into remove-es-async
lahsivjar Sep 23, 2025
eaaa5c3
Update exporter/elasticsearchexporter/README.md
lahsivjar Sep 24, 2025
4b51441
Update reademe
lahsivjar Sep 24, 2025
4e694b9
Update exporter/elasticsearchexporter/README.md
lahsivjar Sep 24, 2025
d5c3685
Update exporter/elasticsearchexporter/README.md
lahsivjar Sep 24, 2025
b93b35c
Update exporter/elasticsearchexporter/README.md
lahsivjar Sep 24, 2025
9ec3412
Update exporter/elasticsearchexporter/config.go
lahsivjar Sep 24, 2025
adcd6ce
Update exporter/elasticsearchexporter/config.go
lahsivjar Sep 24, 2025
72f2e10
Merge branch 'main' into remove-es-async
lahsivjar Sep 24, 2025
382a623
Fix copy pasta errors
lahsivjar Sep 25, 2025
1db0f2b
Do not ignore num_workers and flush settings
lahsivjar Sep 26, 2025
a86f49a
Merge branch 'main' into remove-es-async
lahsivjar Sep 26, 2025
9f3c30e
Update exporter/elasticsearchexporter/README.md
lahsivjar Oct 1, 2025
19e215f
Update exporter/elasticsearchexporter/README.md
lahsivjar Oct 1, 2025
6717fc3
Update exporter/elasticsearchexporter/README.md
lahsivjar Oct 1, 2025
fe36c03
Update exporter/elasticsearchexporter/README.md
lahsivjar Oct 1, 2025
cd1be4f
Account for pdata<>ndjson size discrepency
lahsivjar Oct 2, 2025
19af7b0
Use block on overflow as true to be the default
lahsivjar Oct 3, 2025
51747c2
Fix tests for block_on_overflow set to true
lahsivjar Oct 3, 2025
37b48f5
fix readme
lahsivjar Oct 3, 2025
76b8c2d
Merge branch 'main' into remove-es-async
lahsivjar Oct 3, 2025
95c08ad
Merge branch 'main' into remove-es-async
lahsivjar Oct 6, 2025
10ff125
Merge branch 'main' into remove-es-async
lahsivjar Oct 6, 2025
d0ac698
Merge branch 'main' into remove-es-async
lahsivjar Oct 7, 2025
e4462c6
slight change to changelog wordings
lahsivjar Oct 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ sending_queue:
queue_size: 10
batch:
flush_timeout: 10s
min_size: 5e+6 // 5MB
max_size: 10e+6 // 10MB
min_size: 1e+6 // 1MB
max_size: 5e+6 // 5MB
sizer: bytes
```

The default configurations are chosen to be closer to the previous defaults with the exporter's inbuilt batching. The [`exporterhelper` documentation][exporterhelper] provides more details on the `sending_queue` settings.
The default configurations are chosen to be closer to the defaults with the exporter's previous inbuilt batching feature. The [`exporterhelper` documentation][exporterhelper] provides more details on the `sending_queue` settings.

### Elasticsearch document routing

Expand Down Expand Up @@ -292,8 +292,8 @@ This can be configured through the following settings:
The Elasticsearch exporter uses the [Elasticsearch Bulk API] for indexing documents.
The behaviour of this bulk indexing can be configured with the following settings:

- `num_workers` (default=runtime.NumCPU()): Number of workers publishing bulk requests concurrently. Note this is not applicable if `batcher::enabled` is `true` or `false`.
- `flush`: Event bulk indexer buffer flush settings
- `num_workers` (default=runtime.NumCPU()): This config is deprecated now and will be ignored, use the `sending_queue` config. Number of workers publishing bulk requests concurrently. Note this is not applicable if `batcher::enabled` is `true` or `false`.
- `flush`: This config is deprecated now and will be ignored, use `sending_queue` config. Event bulk indexer buffer flush settings
- `bytes` (default=5000000): Write buffer flush size limit before compression. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB.
- `interval` (default=10s): Write buffer flush time limit.
- `retry`: Elasticsearch bulk request retry settings
Expand All @@ -303,9 +303,18 @@ The behaviour of this bulk indexing can be configured with the following setting
- `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed.
- `max_interval` (default=1m): Max waiting time if a HTTP request failed.
- `retry_on_status` (default=[429]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it defaults to `[429]`.

> [!NOTE]
> The `flush::interval` config will be ignored when `batcher::enabled` config is explicitly set to `true` or `false`.
- `sending_queue`: Configures the queueing and batching behaviour. Below are the defaults (which may vary from standard defaults), for full configuration check the [exporterheler docs][exporterhelper].
- `enabled` (default=true): Enable queueing and batching behaviour.
- `num_consumers` (default=10): Number of consumers that dequeue batches.
- `wait_for_result` (default=false): If `true`, blocks incoming requests until processed.
- `block_on_overflow` (default=false): If `true`, blocks the request until the queue has space.
- `sizer` (default=requests): Measure queueing by requests.
- `queue_size` (default=10): Maximum size the queue can accept.
- `batch`:
- `flush_timeout` (default=10s): Time after which batch is exported irrespective of other settings.
- `sizer` (default=bytes): Size batches by bytes.
- `min_size` (default=1MB): Min size of the batch.
- `max_size` (default=5MB): Max size of the batch.

#### Bulk indexing error response

Expand Down
6 changes: 6 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,12 @@ func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) {
if cfg.TracesDynamicIndex.Enabled {
logger.Warn("traces_dynamic_index::enabled has been deprecated, and will be removed in a future version. It is now a no-op. Dynamic document routing is now the default. See Elasticsearch Exporter README.")
}
if cfg.Flush.Bytes > 0 || cfg.Flush.Interval > 0 {
logger.Warn("flush settings are now deprecated and ignored. Please use `sending_queue` instead.")
}
if cfg.NumWorkers > 0 {
logger.Warn("`num_workers` are now deprecated and ignored. Please use `sending_queue` instead.")
}
}

func handleTelemetryConfig(cfg *Config, logger *zap.Logger) {
Expand Down
12 changes: 6 additions & 6 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func TestConfig(t *testing.T) {
Batch: configoptional.Some(exporterhelper.BatchConfig{
FlushTimeout: 10 * time.Second,
Sizer: exporterhelper.RequestSizerTypeBytes,
MinSize: 5000000,
MaxSize: 10000000,
MinSize: 1000000,
MaxSize: 5000000,
}),
},
Endpoints: []string{
Expand Down Expand Up @@ -148,8 +148,8 @@ func TestConfig(t *testing.T) {
Batch: configoptional.Some(exporterhelper.BatchConfig{
FlushTimeout: 10 * time.Second,
Sizer: exporterhelper.RequestSizerTypeBytes,
MinSize: 5000000,
MaxSize: 10000000,
MinSize: 1000000,
MaxSize: 5000000,
}),
},
Endpoints: []string{"http://localhost:9200"},
Expand Down Expand Up @@ -221,8 +221,8 @@ func TestConfig(t *testing.T) {
Batch: configoptional.Some(exporterhelper.BatchConfig{
FlushTimeout: 10 * time.Second,
Sizer: exporterhelper.RequestSizerTypeBytes,
MinSize: 5000000,
MaxSize: 10000000,
MinSize: 1000000,
MaxSize: 5000000,
}),
},
Endpoints: []string{"http://localhost:9200"},
Expand Down
20 changes: 6 additions & 14 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func createDefaultConfig() component.Config {
qs.QueueSize = 10
qs.Batch = configoptional.Some(exporterhelper.BatchConfig{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I enabled batching by default, but I am a bit conflicted on this. There were 2 main reasons that I considered for making this decision:

  1. To keep the default closer to what we had with async mode.
  2. To have better defaults w.r.t. performance.

The main issue with having batching by default is that the configoptional doesn't support it that well (see open-telemetry/opentelemetry-collector#13894) - basically, if default batching is enabled, then it is not possible to disable batching. We could reach a behavior closer to no batching by setting min_size to 0 but it is not the same as NO batching. I am considering introducing a breaking change to disable batching by default and have users enable batching with the default configuration by setting:

sending_queue:
  batch:

@axw @carsonip WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could enable it by default for backwards compatibilit, but add a feature gate to phase out that behaviour? i.e. the feature gate would make it disabled by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not able to disable batching is unfortunate. What's the desired end state here? To have sending queue and batching both enabled by default, or not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could enable it by default for backwards compatibilit

This means we will give up on being able to disable batching - currently, it cannot be achieved

but add a feature gate to phase out that behaviour? i.e. the feature gate would make it disabled by default.

Sounds reasonable if we can accept not being able to disable the batching behaviour in default mode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we will give up on being able to disable batching - currently, it cannot be achieved

But we would by setting the featuregate, right? So there's still an option, and we preserve the same default behaviour in the transition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep the min_size as 0 and flush interval close to 1ns - wouldn't it be close to no batching ? Can we guide users in setting that instead of gates?

++ on providing better defaults optimized for performance as it would be ideal for anyone using these the exporter for benchmarking etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main issue with having batching by default is that the configoptional doesn't support it that well (see open-telemetry/opentelemetry-collector#13894) - basically, if default batching is enabled, then it is not possible to disable batching.

@axw @carsonip Based on the discussions in open-telemetry/opentelemetry-collector#13894, I think keeping the batching as a default behaviour is a better option for defaults (as per the reasoning in my original comment). Eventually, upstream is going to have a feature to disable configoptional configs and our concern should be resolved. Meanwhile, we have 2 options:

  1. Have a temporary feature flag to disable sending_queue::batch - i.e. we are going to keep the feature flag until upstream allows disabling config optionals. This approach to the feature flag is slightly different than what we discussed above.
  2. Document using min_size set to 0 to effectively disable batching.

WDYT? Any preferences here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My vote is for option 2 to reduce the friction and also to keep Batching the default behaviour.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(2) should be fine as I don't think it'll impact a lot of use cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with option 2.

FlushTimeout: 10 * time.Second,
MinSize: 5e+6,
MaxSize: 10e+6,
MinSize: 1e+6,
MaxSize: 5e+6,
Sizer: exporterhelper.RequestSizerTypeBytes,
})

Expand Down Expand Up @@ -220,19 +220,11 @@ func exporterhelperOptions(
) []exporterhelper.Option {
// not setting capabilities as they will default to non-mutating but will be updated
// by the base-exporter to mutating if batching is enabled.
opts := []exporterhelper.Option{
return []exporterhelper.Option{
exporterhelper.WithStart(start),
exporterhelper.WithShutdown(shutdown),
exporterhelper.WithQueueBatch(cfg.QueueBatchConfig, qbs),
xexporterhelper.WithQueueBatch(cfg.QueueBatchConfig, qbs),
// Effectively disable timeout_sender because timeout is enforced in bulk indexer.
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
}

// Effectively disable timeout_sender because timeout is enforced in bulk indexer.
//
// We keep timeout_sender enabled in the async mode (sending_queue not enabled OR sending
// queue enabled but batching not enabled OR based on the deprecated batcher setting), to
// ensure sending data to the background workers will not block indefinitely.
if cfg.QueueBatchConfig.Enabled {
opts = append(opts, exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}))
}
return opts
}
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ require (
go.opentelemetry.io/collector/config/configoptional v0.136.0
go.opentelemetry.io/collector/confmap v1.42.0
go.opentelemetry.io/collector/confmap/xconfmap v0.136.0
go.opentelemetry.io/collector/consumer v1.42.0
go.opentelemetry.io/collector/consumer/consumererror v0.136.0
go.opentelemetry.io/collector/exporter v1.42.0
go.opentelemetry.io/collector/exporter/exporterhelper v0.136.0
Expand Down Expand Up @@ -88,6 +87,7 @@ require (
go.opentelemetry.io/collector/config/configmiddleware v1.42.0 // indirect
go.opentelemetry.io/collector/config/configretry v1.42.0 // indirect
go.opentelemetry.io/collector/config/configtls v1.42.0 // indirect
go.opentelemetry.io/collector/consumer v1.42.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.136.0 // indirect
go.opentelemetry.io/collector/consumer/consumertest v0.136.0 // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.136.0 // indirect
Expand Down
34 changes: 19 additions & 15 deletions exporter/elasticsearchexporter/integrationtest/datareceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type esDataReceiver struct {
receiver receiver.Logs
endpoint string
decodeBulkRequest bool
batcherEnabled *bool
enableBatching bool
t testing.TB
}

Expand All @@ -92,9 +92,9 @@ func withDecodeBulkRequest(decode bool) dataReceiverOption {
}
}

func withBatcherEnabled(enabled bool) dataReceiverOption {
func withBatching(enabled bool) dataReceiverOption {
return func(r *esDataReceiver) {
r.batcherEnabled = &enabled
r.enableBatching = enabled
}
}

Expand Down Expand Up @@ -153,9 +153,6 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
logs_index: %s
metrics_index: %s
traces_index: %s
sending_queue:
enabled: true
block_on_overflow: true
mapping:
mode: otel
retry:
Expand All @@ -169,17 +166,24 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
es.endpoint, TestLogsIndex, TestMetricsIndex, TestTracesIndex,
)

if es.batcherEnabled == nil {
if es.enableBatching {
cfgFormat += `
flush:
interval: 1s`
sending_queue:
enabled: true
block_on_overflow: true
batch:
flush_timeout: 1s
sizer: bytes`
} else {
cfgFormat += fmt.Sprintf(`
batcher:
flush_timeout: 1s
enabled: %v`,
*es.batcherEnabled,
)
// Batching is disabled using `min_size` as we are setting batching
// as a default behavior.
cfgFormat += `
sending_queue:
enabled: true
block_on_overflow: true
batch:
min_size: 0
sizer: bytes`
}
return cfgFormat + "\n"
}
Expand Down
28 changes: 8 additions & 20 deletions exporter/elasticsearchexporter/integrationtest/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ func TestExporter(t *testing.T) {
for _, tc := range []struct {
name string

// batcherEnabled enables/disables the batch sender. If this is
// nil, then the exporter buffers data itself (legacy behavior),
// whereas if it is non-nil then the exporter will not perform
// any buffering itself.
batcherEnabled *bool
enableBatching bool

// restartCollector restarts the OTEL collector. Restarting
// the collector allows durability testing of the ES exporter
Expand All @@ -37,24 +33,20 @@ func TestExporter(t *testing.T) {
{name: "es_intermittent_http_error", mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}},
{name: "es_intermittent_doc_error", mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}},

{name: "batcher_enabled", batcherEnabled: ptrTo(true)},
{name: "batcher_enabled_es_intermittent_http_error", batcherEnabled: ptrTo(true), mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}},
{name: "batcher_enabled_es_intermittent_doc_error", batcherEnabled: ptrTo(true), mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}},
{name: "batcher_disabled", batcherEnabled: ptrTo(false)},
{name: "batcher_disabled_es_intermittent_http_error", batcherEnabled: ptrTo(false), mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}},
{name: "batcher_disabled_es_intermittent_doc_error", batcherEnabled: ptrTo(false), mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}},
{name: "enable sending_queue batching", enableBatching: true},
{name: "batcher_enabled_es_intermittent_http_error", enableBatching: true, mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}},
{name: "batcher_enabled_es_intermittent_doc_error", enableBatching: true, mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}},
{name: "batcher_disabled", enableBatching: false},
{name: "batcher_disabled_es_intermittent_http_error", enableBatching: false, mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}},
{name: "batcher_disabled_es_intermittent_doc_error", enableBatching: false, mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}},

/* TODO: Below tests should be enabled after https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30792 is fixed
{name: "collector_restarts", restartCollector: true},
{name: "collector_restart_with_es_intermittent_failure", mockESErr: true, restartCollector: true},
*/
} {
t.Run(fmt.Sprintf("%s/%s", eventType, tc.name), func(t *testing.T) {
var opts []dataReceiverOption
if tc.batcherEnabled != nil {
opts = append(opts, withBatcherEnabled(*tc.batcherEnabled))
}
runner(t, eventType, tc.restartCollector, tc.mockESErr, opts...)
runner(t, eventType, tc.restartCollector, tc.mockESErr, withBatching(tc.enableBatching))
})
}
}
Expand Down Expand Up @@ -145,7 +137,3 @@ func runner(t *testing.T, eventType string, restartCollector bool, mockESErr err
)
tc.ValidateData()
}

func ptrTo[T any](t T) *T {
return &t
}
Loading