From 6128da3e39b3fcf83656bd3036275f8a8594ae7c Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 16 Sep 2025 21:22:57 +0100 Subject: [PATCH 01/29] Remove batcher and related config in favor of sending queue Remove batcher and introduce to sending_queue::batch as defaults --- exporter/elasticsearchexporter/bulkindexer.go | 172 +---- .../elasticsearchexporter/bulkindexer_test.go | 701 +----------------- exporter/elasticsearchexporter/config.go | 62 +- exporter/elasticsearchexporter/config_test.go | 122 ++- .../elasticsearchexporter/exporter_test.go | 121 +-- exporter/elasticsearchexporter/factory.go | 74 +- .../elasticsearchexporter/factory_test.go | 40 - .../testdata/config.yaml | 36 +- 8 files changed, 197 insertions(+), 1131 deletions(-) diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index f3efd8fd743f3..85b2dc8d8cd30 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -71,10 +71,7 @@ func newBulkIndexer( tb *metadata.TelemetryBuilder, logger *zap.Logger, ) (bulkIndexer, error) { - if config.Batcher.enabledSet || (config.QueueBatchConfig.Enabled && config.QueueBatchConfig.Batch.HasValue()) { - return newSyncBulkIndexer(client, config, requireDataStream, tb, logger), nil - } - return newAsyncBulkIndexer(client, config, requireDataStream, tb, logger) + return newSyncBulkIndexer(client, config, requireDataStream, tb, logger), nil } func bulkIndexerConfig(client esapi.Transport, config *Config, requireDataStream bool) docappender.BulkIndexerConfig { @@ -121,7 +118,6 @@ func newSyncBulkIndexer( return &syncBulkIndexer{ config: bulkIndexerConfig(client, config, requireDataStream), flushTimeout: config.Timeout, - flushBytes: config.Flush.Bytes, retryConfig: config.Retry, metadataKeys: config.MetadataKeys, telemetryBuilder: tb, @@ -133,7 +129,6 @@ func newSyncBulkIndexer( type syncBulkIndexer struct { config docappender.BulkIndexerConfig flushTimeout time.Duration - flushBytes int retryConfig RetrySettings metadataKeys []string telemetryBuilder *metadata.TelemetryBuilder @@ -185,11 +180,6 @@ func (s *syncBulkIndexerSession) Add(ctx context.Context, index, docID, pipeline getAttributesFromMetadataKeys(ctx, s.s.metadataKeys)...), ), ) - // flush bytes should operate on uncompressed length - // as Elasticsearch http.max_content_length measures uncompressed length. - if s.bi.UncompressedLen() >= s.s.flushBytes { - return s.Flush(ctx) - } return nil } @@ -237,166 +227,6 @@ func (s *syncBulkIndexerSession) Flush(ctx context.Context) error { } } -func newAsyncBulkIndexer( - client esapi.Transport, - config *Config, - requireDataStream bool, - tb *metadata.TelemetryBuilder, - logger *zap.Logger, -) (*asyncBulkIndexer, error) { - numWorkers := config.NumWorkers - if numWorkers == 0 { - numWorkers = runtime.NumCPU() - } - - pool := &asyncBulkIndexer{ - wg: sync.WaitGroup{}, - items: make(chan docappender.BulkIndexerItem, config.NumWorkers), - telemetryBuilder: tb, - } - pool.wg.Add(numWorkers) - - for i := 0; i < numWorkers; i++ { - bi, err := docappender.NewBulkIndexer(bulkIndexerConfig(client, config, requireDataStream)) - if err != nil { - return nil, err - } - w := asyncBulkIndexerWorker{ - indexer: bi, - items: pool.items, - flushInterval: config.Flush.Interval, - flushTimeout: config.Timeout, - flushBytes: config.Flush.Bytes, - telemetryBuilder: tb, - logger: logger, - failedDocsInputLogger: newFailedDocsInputLogger(logger, config), - } - go func() { - defer pool.wg.Done() - w.run() - }() - } - return pool, nil -} - -type asyncBulkIndexer struct { - items chan docappender.BulkIndexerItem - wg sync.WaitGroup - telemetryBuilder *metadata.TelemetryBuilder -} - -type asyncBulkIndexerSession struct { - *asyncBulkIndexer -} - -// StartSession returns a new asyncBulkIndexerSession. -func (a *asyncBulkIndexer) StartSession(context.Context) bulkIndexerSession { - return asyncBulkIndexerSession{a} -} - -// Close closes the asyncBulkIndexer and any active sessions. -func (a *asyncBulkIndexer) Close(ctx context.Context) error { - close(a.items) - doneCh := make(chan struct{}) - go func() { - a.wg.Wait() - close(doneCh) - }() - select { - case <-ctx.Done(): - return ctx.Err() - case <-doneCh: - return nil - } -} - -// Add adds an item to the async bulk indexer session. -// -// Adding an item after a call to Close() will panic. -func (s asyncBulkIndexerSession) Add(ctx context.Context, index, docID, pipeline string, document io.WriterTo, dynamicTemplates map[string]string, action string) error { - item := docappender.BulkIndexerItem{ - Index: index, - Body: document, - DocumentID: docID, - DynamicTemplates: dynamicTemplates, - Action: action, - Pipeline: pipeline, - } - select { - case <-ctx.Done(): - return ctx.Err() - case s.items <- item: - } - s.telemetryBuilder.ElasticsearchDocsReceived.Add(ctx, 1) - return nil -} - -// End is a no-op. -func (asyncBulkIndexerSession) End() { -} - -// Flush is a no-op. -func (asyncBulkIndexerSession) Flush(context.Context) error { - return nil -} - -type asyncBulkIndexerWorker struct { - indexer *docappender.BulkIndexer - items <-chan docappender.BulkIndexerItem - flushInterval time.Duration - flushTimeout time.Duration - flushBytes int - - logger *zap.Logger - failedDocsInputLogger *zap.Logger - telemetryBuilder *metadata.TelemetryBuilder -} - -func (w *asyncBulkIndexerWorker) run() { - flushTick := time.NewTicker(w.flushInterval) - defer flushTick.Stop() - for { - select { - case item, ok := <-w.items: - // if channel is closed, flush and return - if !ok { - w.flush() - return - } - - if err := w.indexer.Add(item); err != nil { - w.logger.Error("error adding item to bulk indexer", zap.Error(err)) - } - - // flush bytes should operate on uncompressed length - // as Elasticsearch http.max_content_length measures uncompressed length. - if w.indexer.UncompressedLen() >= w.flushBytes { - w.flush() - flushTick.Reset(w.flushInterval) - } - case <-flushTick.C: - // bulk indexer needs to be flushed every flush interval because - // there may be pending bytes in bulk indexer buffer due to e.g. document level 429 - w.flush() - } - } -} - -func (w *asyncBulkIndexerWorker) flush() { - // TODO (lahsivjar): Should use proper context else client metadata will not be accessible - ctx := context.Background() - // ignore error as we they should be already logged and for async we don't propagate errors - _ = flushBulkIndexer( - ctx, - w.indexer, - w.flushTimeout, - nil, // async bulk indexer cannot propagate client context/metadata - w.telemetryBuilder, - w.logger, - w.failedDocsInputLogger, - ) -} - func flushBulkIndexer( ctx context.Context, bi *docappender.BulkIndexer, diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index 5e7fee5af2868..58f28eec89a0a 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -4,13 +4,11 @@ package elasticsearchexporter import ( - "errors" "io" "net/http" "strings" "sync/atomic" "testing" - "time" "github.com/elastic/go-docappender/v2" "github.com/elastic/go-elasticsearch/v8" @@ -18,9 +16,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configcompression" - "go.opentelemetry.io/collector/config/confighttp" - "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" @@ -63,562 +59,7 @@ const successResp = `{ ] }` -func TestAsyncBulkIndexer_flushOnClose(t *testing.T) { - cfg := Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 2 << 30}} - client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ - RoundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader(successResp)), - }, nil - }, - }}) - require.NoError(t, err) - - runBulkIndexerOnce(t, &cfg, client) -} - -func TestAsyncBulkIndexer_flush(t *testing.T) { - tests := []struct { - name string - config Config - }{ - { - name: "flush.bytes", - config: Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 1}}, - }, - { - name: "flush.interval", - config: Config{NumWorkers: 1, Flush: FlushSettings{Interval: 50 * time.Millisecond, Bytes: 2 << 30}}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ - RoundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader(successResp)), - }, nil - }, - }}) - require.NoError(t, err) - - ct := componenttest.NewTelemetry() - tb, err := metadata.NewTelemetryBuilder( - metadatatest.NewSettings(ct).TelemetrySettings, - ) - require.NoError(t, err) - bulkIndexer, err := newAsyncBulkIndexer(client, &tt.config, false, tb, zap.NewNop()) - require.NoError(t, err) - - session := bulkIndexer.StartSession(t.Context()) - assert.NoError(t, session.Add(t.Context(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) - // should flush - time.Sleep(100 * time.Millisecond) - assert.NoError(t, session.Flush(t.Context())) - session.End() - assert.NoError(t, bulkIndexer.Close(t.Context())) - // Assert internal telemetry metrics - metadatatest.AssertEqualElasticsearchBulkRequestsCount(t, ct, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchDocsReceived(t, ct, []metricdata.DataPoint[int64]{ - {Value: 1}, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchDocsProcessed(t, ct, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchFlushedUncompressedBytes(t, ct, []metricdata.DataPoint[int64]{ - {Value: 43}, // hard-coding the flush bytes since the input is fixed - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchFlushedBytes(t, ct, []metricdata.DataPoint[int64]{ - {Value: 43}, // hard-coding the flush bytes since the input is fixed - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchBulkRequestsCount(t, ct, []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) - }) - } -} - -func TestAsyncBulkIndexer_flush_error(t *testing.T) { - tests := []struct { - name string - roundTripFunc func(*http.Request) (*http.Response, error) - logFailedDocsInput bool - retrySettings RetrySettings - wantMessage string - wantFields []zap.Field - wantESBulkReqs *metricdata.DataPoint[int64] - wantESDocsProcessed *metricdata.DataPoint[int64] - wantESDocsRetried *metricdata.DataPoint[int64] - wantESLatency *metricdata.HistogramDataPoint[float64] - }{ - { - name: "500", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusInternalServerError, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader("error")), - }, nil - }, - wantMessage: "bulk indexer flush error", - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_server"), - semconv.HTTPResponseStatusCode(500), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_server"), - semconv.HTTPResponseStatusCode(500), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_server"), - semconv.HTTPResponseStatusCode(500), - ), - }, - }, - { - name: "429", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusTooManyRequests, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader("error")), - }, nil - }, - wantMessage: "bulk indexer flush error", - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "too_many"), - semconv.HTTPResponseStatusCode(429), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "too_many"), - semconv.HTTPResponseStatusCode(429), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "too_many"), - semconv.HTTPResponseStatusCode(429), - ), - }, - }, - { - name: "429/with_retry", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader( - `{"items":[{"create":{"_index":"test","status":429}}]}`)), - }, nil - }, - retrySettings: RetrySettings{Enabled: true, MaxRetries: 5, RetryOnStatus: []int{429}}, - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - wantESDocsRetried: &metricdata.DataPoint[int64]{Value: 1}, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, - { - name: "500/doc_level", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader( - `{"items":[{"create":{"_index":"test","status":500,"error":{"type":"internal_server_error","reason":""}}}]}`)), - }, nil - }, - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_server"), - attribute.String("error.type", "internal_server_error"), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, - { - name: "transport error", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return nil, errors.New("transport error") - }, - wantMessage: "bulk indexer flush error", - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "internal_server_error"), - semconv.HTTPResponseStatusCode(http.StatusInternalServerError), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "internal_server_error"), - semconv.HTTPResponseStatusCode(http.StatusInternalServerError), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "internal_server_error"), - semconv.HTTPResponseStatusCode(http.StatusInternalServerError), - ), - }, - }, - { - name: "known version conflict error", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader( - `{"items":[{"create":{"_index":".ds-metrics-generic.otel-default","status":400,"error":{"type":"version_conflict_engine_exception","reason":""}}}]}`)), - }, nil - }, - wantMessage: "failed to index document", - wantFields: []zap.Field{zap.String("hint", "check the \"Known issues\" section of Elasticsearch Exporter docs")}, - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_client"), - attribute.String("error.type", "version_conflict_engine_exception"), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, - { - name: "known version conflict error with logFailedDocsInput", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader( - `{"items":[{"create":{"_index":".ds-metrics-generic.otel-default","status":400,"error":{"type":"version_conflict_engine_exception","reason":""}}}]}`)), - }, nil - }, - logFailedDocsInput: true, - wantMessage: "failed to index document; input may contain sensitive data", - wantFields: []zap.Field{ - zap.String("hint", "check the \"Known issues\" section of Elasticsearch Exporter docs"), - zap.String("input", `{"create":{"_index":"foo"}} -{"foo": "bar"} -`), - }, - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_client"), - attribute.String("error.type", "version_conflict_engine_exception"), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, - { - name: "skip profiling version conflict logging", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader( - `{"items":[{"create":{"_index":".profiling-stackframes-2024.06.01","status":400,"error":{"type":"version_conflict_engine_exception","reason":"document already exists"}}}]}`)), - }, nil - }, - wantMessage: "", - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(200), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_client"), - attribute.String("error.type", "version_conflict_engine_exception"), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(200), - ), - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - cfg := Config{ - NumWorkers: 1, - Flush: FlushSettings{Interval: time.Hour, Bytes: 1}, - Retry: tt.retrySettings, - MetadataKeys: []string{"x-test"}, - } - if tt.logFailedDocsInput { - cfg.LogFailedDocsInput = true - } - esClient, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ - RoundTripFunc: tt.roundTripFunc, - }}) - require.NoError(t, err) - core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel)) - - ct := componenttest.NewTelemetry() - tb, err := metadata.NewTelemetryBuilder( - metadatatest.NewSettings(ct).TelemetrySettings, - ) - require.NoError(t, err) - bulkIndexer, err := newAsyncBulkIndexer(esClient, &cfg, false, tb, zap.New(core)) - require.NoError(t, err) - defer bulkIndexer.Close(t.Context()) - - // Client metadata are not added to the telemetry for async bulk indexer - info := client.Info{Metadata: client.NewMetadata(map[string][]string{"x-test": {"test"}})} - ctx := client.NewContext(t.Context(), info) - session := bulkIndexer.StartSession(ctx) - assert.NoError(t, session.Add(ctx, "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) - // should flush - time.Sleep(100 * time.Millisecond) - if tt.wantMessage != "" { - messages := observed.FilterMessage(tt.wantMessage) - require.Equal(t, 1, messages.Len(), "message not found; observed.All()=%v", observed.All()) - for _, wantField := range tt.wantFields { - assert.Equal(t, 1, messages.FilterField(wantField).Len(), "message with field not found; observed.All()=%v", observed.All()) - } - } - assert.NoError(t, session.Flush(t.Context())) - session.End() - // Assert internal telemetry metrics - if tt.wantESBulkReqs != nil { - metadatatest.AssertEqualElasticsearchBulkRequestsCount( - t, ct, - []metricdata.DataPoint[int64]{*tt.wantESBulkReqs}, - metricdatatest.IgnoreTimestamp(), - ) - } - if tt.wantESDocsProcessed != nil { - metadatatest.AssertEqualElasticsearchDocsProcessed( - t, ct, - []metricdata.DataPoint[int64]{*tt.wantESDocsProcessed}, - metricdatatest.IgnoreTimestamp(), - ) - } - if tt.wantESDocsRetried != nil { - metadatatest.AssertEqualElasticsearchDocsRetried( - t, ct, - []metricdata.DataPoint[int64]{*tt.wantESDocsRetried}, - metricdatatest.IgnoreTimestamp(), - ) - } - if tt.wantESLatency != nil { - metadatatest.AssertEqualElasticsearchBulkRequestsLatency( - t, ct, - []metricdata.HistogramDataPoint[float64]{*tt.wantESLatency}, - metricdatatest.IgnoreTimestamp(), - metricdatatest.IgnoreValue(), - ) - } - }) - } -} - -func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) { - tests := []struct { - name string - config Config - }{ - { - name: "compression none", - config: Config{ - NumWorkers: 1, - ClientConfig: confighttp.ClientConfig{Compression: "none"}, - Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8}, - }, - }, - { - name: "compression gzip", - config: Config{ - NumWorkers: 1, - ClientConfig: confighttp.ClientConfig{Compression: "gzip"}, - Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8}, - }, - }, - { - name: "compression gzip - level 5", - config: Config{ - NumWorkers: 1, - ClientConfig: confighttp.ClientConfig{Compression: "gzip", CompressionParams: configcompression.CompressionParams{Level: 5}}, - Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - loggerCore, logObserver := observer.New(zap.DebugLevel) - - esLogger := clientLogger{ - Logger: zap.New(loggerCore), - logRequestBody: true, - logResponseBody: true, - } - - client, err := elasticsearch.NewClient(elasticsearch.Config{ - Transport: &mockTransport{ - RoundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader(successResp)), - }, nil - }, - }, - Logger: &esLogger, - }) - require.NoError(t, err) - - runBulkIndexerOnce(t, &tt.config, client) - - records := logObserver.AllUntimed() - require.Len(t, records, 1) - - assert.Equal(t, "/_bulk", records[0].ContextMap()["path"]) - assert.Equal(t, "{\"create\":{\"_index\":\"foo\"}}\n{\"foo\": \"bar\"}\n", records[0].ContextMap()["request_body"]) - assert.JSONEq(t, successResp, records[0].ContextMap()["response_body"].(string)) - }) - } -} - -func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Client) *asyncBulkIndexer { - ct := componenttest.NewTelemetry() - tb, err := metadata.NewTelemetryBuilder( - metadatatest.NewSettings(ct).TelemetrySettings, - ) - require.NoError(t, err) - bulkIndexer, err := newAsyncBulkIndexer(client, config, false, tb, zap.NewNop()) - require.NoError(t, err) - - session := bulkIndexer.StartSession(t.Context()) - assert.NoError(t, session.Add(t.Context(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) - assert.NoError(t, session.Flush(t.Context())) - session.End() - assert.NoError(t, bulkIndexer.Close(t.Context())) - // Assert internal telemetry metrics - metadatatest.AssertEqualElasticsearchBulkRequestsCount(t, ct, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchDocsReceived(t, ct, []metricdata.DataPoint[int64]{ - {Value: 1}, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchDocsProcessed(t, ct, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchFlushedUncompressedBytes(t, ct, []metricdata.DataPoint[int64]{ - {Value: 43}, // hard-coding the flush bytes since the input is fixed - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchFlushedBytes( - t, ct, - []metricdata.DataPoint[int64]{{}}, - metricdatatest.IgnoreTimestamp(), - metricdatatest.IgnoreValue(), // compression can change in test, ignore value - ) - - return bulkIndexer -} - -func TestSyncBulkIndexer_flushBytes(t *testing.T) { +func TestSyncBulkIndexer(t *testing.T) { tests := []struct { name string responseBody string @@ -641,8 +82,9 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var reqCnt atomic.Int64 cfg := Config{ - NumWorkers: 1, - Flush: FlushSettings{Interval: time.Hour, Bytes: 1}, + QueueBatchConfig: exporterhelper.QueueBatchConfig{ + NumConsumers: 1, + }, MetadataKeys: []string{"x-test"}, } esClient, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ @@ -672,8 +114,9 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { ctx := client.NewContext(t.Context(), info) session := bi.StartSession(ctx) assert.NoError(t, session.Add(ctx, "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) - assert.Equal(t, int64(1), reqCnt.Load()) + assert.Equal(t, int64(0), reqCnt.Load()) // requests will not flush unless flush is called explicitly assert.NoError(t, session.Flush(ctx)) + assert.Equal(t, int64(1), reqCnt.Load()) session.End() assert.NoError(t, bi.Close(ctx)) @@ -744,129 +187,11 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { } func TestNewBulkIndexer(t *testing.T) { - for _, tc := range []struct { - name string - config map[string]any - expectSyncBulkIndexer bool - }{ - { - name: "batcher_enabled_unset", - config: map[string]any{ - "batcher": map[string]any{ - "min_size": 100, - "max_size": 200, - }, - }, - expectSyncBulkIndexer: false, - }, - { - name: "batcher_enabled=true", - config: map[string]any{ - "batcher": map[string]any{ - "enabled": true, - "min_size": 100, - "max_size": 200, - }, - }, - expectSyncBulkIndexer: true, - }, - { - name: "batcher_enabled=true", - config: map[string]any{ - "batcher": map[string]any{ - "enabled": false, - "min_size": 100, - "max_size": 200, - }, - }, - expectSyncBulkIndexer: true, - }, - { - name: "sending_queue_enabled_without_batcher", - config: map[string]any{ - "sending_queue": map[string]any{ - "enabled": true, - }, - }, - expectSyncBulkIndexer: false, - }, - { - name: "sending_queue__with_batch_enabled", - config: map[string]any{ - "sending_queue": map[string]any{ - "enabled": true, - "batch": map[string]any{ - "min_size": 100, - "max_size": 200, - }, - }, - }, - expectSyncBulkIndexer: true, - }, - { - name: "sending_queue_disabled_but_batch_configured", - config: map[string]any{ - "sending_queue": map[string]any{ - "enabled": false, - "batch": map[string]any{ - "min_size": 100, - "max_size": 200, - }, - }, - "batcher": map[string]any{ - // no enabled set - "min_size": 100, - "max_size": 200, - }, - }, - expectSyncBulkIndexer: false, - }, - { - name: "sending_queue_overrides_batcher", - config: map[string]any{ - "sending_queue": map[string]any{ - "enabled": true, - "batch": map[string]any{ - "min_size": 100, - "max_size": 200, - }, - }, - "batcher": map[string]any{ - "enabled": true, - "min_size": 100, - "max_size": 200, - }, - }, - expectSyncBulkIndexer: true, - }, - { - name: "sending_queue_without_batch_with_batcher_enabled", - config: map[string]any{ - "sending_queue": map[string]any{ - "enabled": true, - }, - "batcher": map[string]any{ - "enabled": true, - "min_size": 100, - "max_size": 200, - }, - }, - expectSyncBulkIndexer: true, - }, - } { - t.Run(tc.name, func(t *testing.T) { - client, err := elasticsearch.NewDefaultClient() - require.NoError(t, err) - cfg := createDefaultConfig() - cm := confmap.NewFromStringMap(tc.config) - require.NoError(t, cm.Unmarshal(cfg)) - - bi, err := newBulkIndexer(client, cfg.(*Config), true, nil, nil) - require.NoError(t, err) - t.Cleanup(func() { bi.Close(t.Context()) }) + client, err := elasticsearch.NewDefaultClient() + require.NoError(t, err) + cfg := createDefaultConfig() - _, ok := bi.(*syncBulkIndexer) - assert.Equal(t, tc.expectSyncBulkIndexer, ok) - }) - } + bi, err := newBulkIndexer(client, cfg.(*Config), true, nil, nil) + require.NoError(t, err) + t.Cleanup(func() { bi.Close(t.Context()) }) } diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index b1ba346b0097e..f4533b62ca7e5 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -16,7 +16,6 @@ import ( "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" - "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/zap" ) @@ -41,6 +40,9 @@ type Config struct { CloudID string `mapstructure:"cloudid"` // NumWorkers configures the number of workers publishing bulk requests. + // + // Deprecated: [v0.136.0] This config is now deprecated. Use `sending_queue::num_consumers` + // instead. Num workers will be ignored if defined and will be dropped in future releases. NumWorkers int `mapstructure:"num_workers"` // LogsIndex configures the static index used for document routing for logs. @@ -74,9 +76,12 @@ type Config struct { Authentication AuthenticationSettings `mapstructure:",squash"` Discovery DiscoverySettings `mapstructure:"discover"` Retry RetrySettings `mapstructure:"retry"` - Flush FlushSettings `mapstructure:"flush"` - Mapping MappingsSettings `mapstructure:"mapping"` - LogstashFormat LogstashFormatSettings `mapstructure:"logstash_format"` + + // Deprecated: [v0.136.0] This config is now deprecated. Use `sending_queue::batch` instead. + // Flush settings will be ignored if defined and will be dropped in future releases. + Flush FlushSettings `mapstructure:"flush"` + Mapping MappingsSettings `mapstructure:"mapping"` + LogstashFormat LogstashFormatSettings `mapstructure:"logstash_format"` // TelemetrySettings contains settings useful for testing/debugging purposes. // This is experimental and may change at any time. @@ -97,18 +102,6 @@ type Config struct { // Users are expected to sanitize the responses themselves. IncludeSourceOnError *bool `mapstructure:"include_source_on_error"` - // Batcher holds configuration for batching requests based on timeout - // and size-based thresholds. - // - // Batcher is unused by default, in which case Flush will be used. - // If Batcher.Enabled is non-nil (i.e. batcher::enabled is specified), - // then the Flush will be ignored even if Batcher.Enabled is false. - // - // Deprecated: [v0.132.0] This config is now deprecated. Use `sending_queue::batch` instead. - // Batcher config will be ignored if `sending_queue::batch` is defined even if sending queue - // is disabled. - Batcher BatcherConfig `mapstructure:"batcher"` - // Experimental: MetadataKeys defines a list of client.Metadata keys that // will be used as partition keys for when batcher is enabled and will be // added to the exporter's telemetry if defined. The config only applies @@ -122,31 +115,6 @@ type Config struct { MetadataKeys []string `mapstructure:"metadata_keys"` } -// BatcherConfig holds configuration for exporterbatcher. -// -// This is a slightly modified version of exporterbatcher.Config, -// to enable tri-state Enabled: unset, false, true. -type BatcherConfig struct { - Enabled bool `mapstructure:"enabled"` - FlushTimeout time.Duration `mapstructure:"flush_timeout"` - Sizer exporterhelper.RequestSizerType `mapstructure:"sizer"` - MinSize int64 `mapstructure:"min_size"` - MaxSize int64 `mapstructure:"max_size"` - - // enabledSet tracks whether Enabled has been specified. - // If enabledSet is false, the exporter will perform its - // own buffering. - enabledSet bool `mapstructure:"-"` -} - -func (c *BatcherConfig) Unmarshal(conf *confmap.Conf) error { - if err := conf.Unmarshal(c); err != nil { - return err - } - c.enabledSet = conf.IsSet("enabled") - return nil -} - type TelemetrySettings struct { LogRequestBody bool `mapstructure:"log_request_body"` LogResponseBody bool `mapstructure:"log_response_body"` @@ -233,11 +201,17 @@ type DiscoverySettings struct { // FlushSettings defines settings for configuring the write buffer flushing // policy in the Elasticsearch exporter. The exporter sends a bulk request with // all events already serialized into the send-buffer. +// +// Deprecated: [v0.136.0] This config is now deprecated. Use `sending_queue::batch` instead. +// Flush settings will be ignored if defined and will be dropped in future releases. type FlushSettings struct { // Bytes sets the send buffer flushing limit. + // Bytes is now deprecated. Use `sending_queue::batch::{min, max}_size with `bytes` + // sizer to configure batching based on bytes. Bytes int `mapstructure:"bytes"` // Interval configures the max age of a document in the send buffer. + // Interval is now deprecated. Use `sending-queue::batch::flush_timeout` instead. Interval time.Duration `mapstructure:"interval"` // prevent unkeyed literal initialization @@ -502,12 +476,6 @@ func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) { if cfg.TracesDynamicIndex.Enabled { logger.Warn("traces_dynamic_index::enabled has been deprecated, and will be removed in a future version. It is now a no-op. Dynamic document routing is now the default. See Elasticsearch Exporter README.") } - switch { - case cfg.Batcher.enabledSet && cfg.QueueBatchConfig.Batch.HasValue(): - logger.Warn("batcher::enabled and sending_queue::batch both have been set, sending_queue::batch will take preference.") - case cfg.Batcher.enabledSet: - logger.Warn("batcher has been deprecated, and will be removed in a future version. Use sending_queue instead.") - } } func handleTelemetryConfig(cfg *Config, logger *zap.Logger) { diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 755227ff7f8f7..5cf87808f50bf 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -54,16 +54,24 @@ func TestConfig(t *testing.T) { expected: defaultCfg, }, { - id: component.NewIDWithName(metadata.Type, "trace"), configFile: "config.yaml", + id: component.NewIDWithName(metadata.Type, "trace"), expected: &Config{ QueueBatchConfig: exporterhelper.QueueBatchConfig{ - Enabled: false, - NumConsumers: exporterhelper.NewDefaultQueueConfig().NumConsumers, - QueueSize: exporterhelper.NewDefaultQueueConfig().QueueSize, + Enabled: true, + NumConsumers: 10, + QueueSize: 10, Sizer: exporterhelper.RequestSizerTypeRequests, + Batch: configoptional.Some(exporterhelper.BatchConfig{ + FlushTimeout: 10 * time.Second, + Sizer: exporterhelper.RequestSizerTypeBytes, + MinSize: 5000000, + MaxSize: 10000000, + }), + }, + Endpoints: []string{ + "https://elastic.example.com:9200", }, - Endpoints: []string{"https://elastic.example.com:9200"}, LogsDynamicIndex: DynamicIndexSetting{ Enabled: false, }, @@ -85,12 +93,11 @@ func TestConfig(t *testing.T) { cfg.Timeout = 2 * time.Minute cfg.MaxIdleConns = defaultMaxIdleConns cfg.IdleConnTimeout = defaultIdleConnTimeout - cfg.Headers = map[string]configopaque.String{ - "myheader": "test", - } + cfg.Headers = map[string]configopaque.String{"myheader": "test"} cfg.Compression = defaultCompression cfg.CompressionParams.Level = gzip.BestSpeed - }), + }, + ), Authentication: AuthenticationSettings{ User: "elastic", Password: "search", @@ -99,31 +106,31 @@ func TestConfig(t *testing.T) { Discovery: DiscoverySettings{ OnStart: true, }, - Flush: FlushSettings{ - Bytes: 10485760, - Interval: 10 * time.Second, - }, Retry: RetrySettings{ Enabled: true, MaxRetries: 5, InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, - RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError}, + RetryOnStatus: []int{ + http.StatusTooManyRequests, + http.StatusInternalServerError, + }, }, Mapping: MappingsSettings{ - Mode: "otel", - AllowedModes: []string{"bodymap", "ecs", "none", "otel", "raw"}, + Mode: "otel", + AllowedModes: []string{ + "bodymap", + "ecs", + "none", + "otel", + "raw", + }, }, LogstashFormat: LogstashFormatSettings{ Enabled: false, PrefixSeparator: "-", DateFormat: "%Y.%m.%d", }, - Batcher: BatcherConfig{ - FlushTimeout: 10 * time.Second, - Sizer: exporterhelper.RequestSizerTypeItems, - MinSize: defaultBatcherMinSizeItems, - }, TelemetrySettings: TelemetrySettings{ LogFailedDocsInputRateLimit: time.Second, }, @@ -135,9 +142,15 @@ func TestConfig(t *testing.T) { expected: &Config{ QueueBatchConfig: exporterhelper.QueueBatchConfig{ Enabled: true, - NumConsumers: exporterhelper.NewDefaultQueueConfig().NumConsumers, - QueueSize: exporterhelper.NewDefaultQueueConfig().QueueSize, + NumConsumers: 10, + QueueSize: 10, Sizer: exporterhelper.RequestSizerTypeRequests, + Batch: configoptional.Some(exporterhelper.BatchConfig{ + FlushTimeout: 10 * time.Second, + Sizer: exporterhelper.RequestSizerTypeBytes, + MinSize: 5000000, + MaxSize: 10000000, + }), }, Endpoints: []string{"http://localhost:9200"}, LogsIndex: "my_log_index", @@ -175,10 +188,6 @@ func TestConfig(t *testing.T) { Discovery: DiscoverySettings{ OnStart: true, }, - Flush: FlushSettings{ - Bytes: 10485760, - Interval: 10 * time.Second, - }, Retry: RetrySettings{ Enabled: true, MaxRetries: 5, @@ -195,11 +204,6 @@ func TestConfig(t *testing.T) { PrefixSeparator: "-", DateFormat: "%Y.%m.%d", }, - Batcher: BatcherConfig{ - FlushTimeout: 10 * time.Second, - Sizer: exporterhelper.RequestSizerTypeItems, - MinSize: defaultBatcherMinSizeItems, - }, TelemetrySettings: TelemetrySettings{ LogFailedDocsInputRateLimit: time.Second, }, @@ -211,9 +215,15 @@ func TestConfig(t *testing.T) { expected: &Config{ QueueBatchConfig: exporterhelper.QueueBatchConfig{ Enabled: true, - NumConsumers: exporterhelper.NewDefaultQueueConfig().NumConsumers, - QueueSize: exporterhelper.NewDefaultQueueConfig().QueueSize, + NumConsumers: 10, + QueueSize: 10, Sizer: exporterhelper.RequestSizerTypeRequests, + Batch: configoptional.Some(exporterhelper.BatchConfig{ + FlushTimeout: 10 * time.Second, + Sizer: exporterhelper.RequestSizerTypeBytes, + MinSize: 5000000, + MaxSize: 10000000, + }), }, Endpoints: []string{"http://localhost:9200"}, LogsDynamicIndex: DynamicIndexSetting{ @@ -251,10 +261,6 @@ func TestConfig(t *testing.T) { Discovery: DiscoverySettings{ OnStart: true, }, - Flush: FlushSettings{ - Bytes: 10485760, - Interval: 10 * time.Second, - }, Retry: RetrySettings{ Enabled: true, MaxRetries: 5, @@ -271,11 +277,6 @@ func TestConfig(t *testing.T) { PrefixSeparator: "-", DateFormat: "%Y.%m.%d", }, - Batcher: BatcherConfig{ - FlushTimeout: 10 * time.Second, - Sizer: exporterhelper.RequestSizerTypeItems, - MinSize: defaultBatcherMinSizeItems, - }, TelemetrySettings: TelemetrySettings{ LogFailedDocsInputRateLimit: time.Second, }, @@ -305,16 +306,6 @@ func TestConfig(t *testing.T) { cfg.Endpoint = "https://elastic.example.com:9200" }), }, - { - id: component.NewIDWithName(metadata.Type, "batcher_disabled"), - configFile: "config.yaml", - expected: withDefaultConfig(func(cfg *Config) { - cfg.Endpoint = "https://elastic.example.com:9200" - - cfg.Batcher.Enabled = false - cfg.Batcher.enabledSet = true - }), - }, { id: component.NewIDWithName(metadata.Type, "compression_none"), configFile: "config.yaml", @@ -334,46 +325,45 @@ func TestConfig(t *testing.T) { }), }, { - id: component.NewIDWithName(metadata.Type, "batcher_minmax_size"), + id: component.NewIDWithName(metadata.Type, "include_source_on_error"), configFile: "config.yaml", expected: withDefaultConfig(func(cfg *Config) { cfg.Endpoint = "https://elastic.example.com:9200" - - cfg.Batcher.MinSize = 100 - cfg.Batcher.MaxSize = 200 + includeSource := true + cfg.IncludeSourceOnError = &includeSource }), }, { - id: component.NewIDWithName(metadata.Type, "include_source_on_error"), + id: component.NewIDWithName(metadata.Type, "metadata_keys"), configFile: "config.yaml", expected: withDefaultConfig(func(cfg *Config) { cfg.Endpoint = "https://elastic.example.com:9200" - includeSource := true - cfg.IncludeSourceOnError = &includeSource + + cfg.MetadataKeys = []string{"x-test-1", "x-test-2"} }), }, { - id: component.NewIDWithName(metadata.Type, "metadata_keys"), + id: component.NewIDWithName(metadata.Type, "sendingqueue_disabled"), configFile: "config.yaml", expected: withDefaultConfig(func(cfg *Config) { cfg.Endpoint = "https://elastic.example.com:9200" - cfg.MetadataKeys = []string{"x-test-1", "x-test-2"} + cfg.QueueBatchConfig.Enabled = false }), }, { - id: component.NewIDWithName(metadata.Type, "queuebatch_enabled"), + id: component.NewIDWithName(metadata.Type, "sendingqueue_enabled"), configFile: "config.yaml", expected: withDefaultConfig(func(cfg *Config) { cfg.Endpoint = "https://elastic.example.com:9200" - cfg.QueueBatchConfig.Enabled = true cfg.QueueBatchConfig.NumConsumers = 100 - cfg.QueueBatchConfig.Sizer = exporterhelper.RequestSizerTypeItems cfg.QueueBatchConfig.Batch = configoptional.Some( exporterhelper.BatchConfig{ Sizer: exporterhelper.RequestSizerTypeItems, - FlushTimeout: 10 * time.Second, + FlushTimeout: time.Second, + MinSize: 1000, + MaxSize: 5000, }, ) }), diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 85787e60e2f7e..620cbe6fed897 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -631,9 +631,9 @@ func TestExporterLogs(t *testing.T) { cfg.Retry.InitialInterval = 1 * time.Millisecond cfg.Retry.MaxInterval = 5 * time.Millisecond - // use sync bulk indexer - cfg.Batcher.Enabled = false - cfg.Batcher.enabledSet = true + // use sync flushing + cfg.QueueBatchConfig.Enabled = true + cfg.QueueBatchConfig.WaitForResult = true }) logs := plog.NewLogs() @@ -675,8 +675,10 @@ func TestExporterLogs(t *testing.T) { cfg.Retry.InitialInterval = 1 * time.Millisecond cfg.Retry.MaxInterval = 5 * time.Millisecond - // use async bulk indexer - cfg.Batcher.enabledSet = false + // use async indexer + cfg.QueueBatchConfig.Enabled = true + cfg.QueueBatchConfig.WaitForResult = false + cfg.QueueBatchConfig.BlockOnOverflow = false }) mustSendLogRecords(t, exporter, plog.NewLogRecord()) // as sync bulk indexer is used, retries are not guaranteed to finish @@ -822,7 +824,6 @@ func TestExporterLogs(t *testing.T) { exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { cfg.Mapping.Mode = "otel" - cfg.Flush.Interval = 50 * time.Millisecond cfg.Retry.InitialInterval = 1 * time.Millisecond cfg.Retry.MaxInterval = 10 * time.Millisecond }) @@ -867,12 +868,12 @@ func TestExporterLogs(t *testing.T) { cfgs := map[string]func(*Config){ "async": func(cfg *Config) { - cfg.Batcher.enabledSet = false + cfg.QueueBatchConfig.WaitForResult = false + cfg.QueueBatchConfig.BlockOnOverflow = false }, "sync": func(cfg *Config) { - cfg.Batcher.enabledSet = true - cfg.Batcher.Enabled = true - cfg.Batcher.FlushTimeout = 10 * time.Millisecond + cfg.QueueBatchConfig.WaitForResult = true + cfg.QueueBatchConfig.BlockOnOverflow = false }, } for _, tt := range tableTests { @@ -943,11 +944,12 @@ func TestExporterLogs(t *testing.T) { cfgs := map[string]func(*Config){ "async": func(cfg *Config) { - cfg.Batcher.Enabled = false + cfg.QueueBatchConfig.WaitForResult = false + cfg.QueueBatchConfig.BlockOnOverflow = false }, "sync": func(cfg *Config) { - cfg.Batcher.Enabled = true - cfg.Batcher.FlushTimeout = 10 * time.Millisecond + cfg.QueueBatchConfig.WaitForResult = true + cfg.QueueBatchConfig.BlockOnOverflow = false }, } for _, tt := range tableTests { @@ -993,6 +995,10 @@ func TestExporterMetrics(t *testing.T) { t.Run("publish with success", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + for _, doc := range docs { + fmt.Println("KKK", string(doc.Action)) + fmt.Println("KKK", string(doc.Document)) + } rec.Record(docs) return itemsAllOK(docs) }) @@ -1000,11 +1006,16 @@ func TestExporterMetrics(t *testing.T) { exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { cfg.Mapping.Mode = "ecs" }) - dp := pmetric.NewNumberDataPoint() - dp.SetDoubleValue(123.456) - dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - mustSendMetricSumDataPoints(t, exporter, dp) - mustSendMetricGaugeDataPoints(t, exporter, dp) + dpSum := pmetric.NewNumberDataPoint() + dpSum.SetDoubleValue(123.456) + dpSum.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-2 * time.Second))) + mustSendMetricSumDataPoints(t, exporter, dpSum) + + dpGauge := pmetric.NewNumberDataPoint() + dpGauge.SetDoubleValue(123.456) + // Keep timestamp different to avoid metric grouping putting them in same doc + dpGauge.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-1 * time.Second))) + mustSendMetricGaugeDataPoints(t, exporter, dpGauge) rec.WaitItems(2) }) @@ -2435,7 +2446,10 @@ func TestExporter_DynamicMappingMode(t *testing.T) { }) logs := createLogs(tc.scopes...) - exporter := newTestLogsExporter(t, server.URL, setAllowedMappingModes) + exporter := newTestLogsExporter(t, server.URL, setAllowedMappingModes, func(cfg *Config) { + // Set wait_for_result to be true so that errors are reported directly via Consume* + cfg.QueueBatchConfig.WaitForResult = true + }) err := exporter.ConsumeLogs(tc.ctx, logs) if tc.expectErr != "" { require.EqualError(t, err, tc.expectErr) @@ -2460,7 +2474,10 @@ func TestExporter_DynamicMappingMode(t *testing.T) { }) metrics := createMetrics(tc.scopes...) - exporter := newTestMetricsExporter(t, server.URL, setAllowedMappingModes) + exporter := newTestMetricsExporter(t, server.URL, setAllowedMappingModes, func(cfg *Config) { + // Set wait_for_result to be true so that errors are reported directly via Consume* + cfg.QueueBatchConfig.WaitForResult = true + }) err := exporter.ConsumeMetrics(tc.ctx, metrics) if tc.expectErr != "" { require.EqualError(t, err, tc.expectErr) @@ -2478,7 +2495,10 @@ func TestExporter_DynamicMappingMode(t *testing.T) { t.Run("profiles", func(t *testing.T) { // Profiles are only supported by otel mode, so just verify that // the metadata is picked up and invalid modes are rejected. - exporter := newTestProfilesExporter(t, "https://testing.invalid", setAllowedMappingModes) + exporter := newTestProfilesExporter(t, "https://testing.invalid", setAllowedMappingModes, func(cfg *Config) { + // Set wait_for_result to be true so that errors are reported directly via Consume* + cfg.QueueBatchConfig.WaitForResult = true + }) err := exporter.ConsumeProfiles(noneContext, pprofile.NewProfiles()) assert.EqualError(t, err, `Permanent error: invalid context mapping mode: unsupported mapping mode "none", expected one of ["ecs" "otel"]`, @@ -2499,7 +2519,10 @@ func TestExporter_DynamicMappingMode(t *testing.T) { }) traces := createTraces(tc.scopes...) - exporter := newTestTracesExporter(t, server.URL, setAllowedMappingModes) + exporter := newTestTracesExporter(t, server.URL, setAllowedMappingModes, func(cfg *Config) { + // Set wait_for_result to be true so that errors are reported directly via Consume* + cfg.QueueBatchConfig.WaitForResult = true + }) err := exporter.ConsumeTraces(tc.ctx, traces) if tc.expectErr != "" { require.EqualError(t, err, tc.expectErr) @@ -2548,14 +2571,14 @@ func TestExporterBatcher(t *testing.T) { var requests []*http.Request testauthID := component.NewID(component.MustNewType("authtest")) exporter := newUnstartedTestLogsExporter(t, "http://testing.invalid", func(cfg *Config) { - cfg.Batcher = BatcherConfig{ - Enabled: false, - // sync bulk indexer is used without batching + cfg.QueueBatchConfig.Enabled = true + cfg.QueueBatchConfig.WaitForResult = true + cfg.QueueBatchConfig.Batch = configoptional.Some(exporterhelper.BatchConfig{ FlushTimeout: 200 * time.Millisecond, Sizer: exporterhelper.RequestSizerTypeItems, MinSize: 8192, - enabledSet: true, - } + MaxSize: 10000, + }) cfg.Auth = configoptional.Some(configauth.Config{AuthenticatorID: testauthID}) cfg.Retry.Enabled = false }) @@ -2590,8 +2613,9 @@ func newTestTracesExporter(t *testing.T, url string, fns ...func(*Config)) expor f := NewFactory() cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { cfg.Endpoints = []string{url} - cfg.NumWorkers = 1 - cfg.Flush.Interval = 10 * time.Millisecond + cfg.QueueBatchConfig.NumConsumers = 1 + // Batch is configured by default so we can directy edit flush timeout + cfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Millisecond }}, fns...)...) require.NoError(t, xconfmap.Validate(cfg)) exp, err := f.CreateTraces(t.Context(), exportertest.NewNopSettings(metadata.Type), cfg) @@ -2609,8 +2633,9 @@ func newTestProfilesExporter(t *testing.T, url string, fns ...func(*Config)) xex f := NewFactory().(xexporter.Factory) cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { cfg.Endpoints = []string{url} - cfg.NumWorkers = 1 - cfg.Flush.Interval = 10 * time.Millisecond + cfg.QueueBatchConfig.NumConsumers = 1 + // Batch is configured by default so we can directy edit flush timeout + cfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Millisecond }}, fns...)...) require.NoError(t, xconfmap.Validate(cfg)) exp, err := f.CreateProfiles(t.Context(), exportertest.NewNopSettings(metadata.Type), cfg) @@ -2628,8 +2653,9 @@ func newTestMetricsExporter(t *testing.T, url string, fns ...func(*Config)) expo f := NewFactory() cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { cfg.Endpoints = []string{url} - cfg.NumWorkers = 1 - cfg.Flush.Interval = 10 * time.Millisecond + cfg.QueueBatchConfig.NumConsumers = 1 + // Batch is configured by default so we can directy edit flush timeout + cfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Millisecond }}, fns...)...) require.NoError(t, xconfmap.Validate(cfg)) exp, err := f.CreateMetrics(t.Context(), exportertest.NewNopSettings(metadata.Type), cfg) @@ -2657,8 +2683,9 @@ func newUnstartedTestLogsExporter(t *testing.T, url string, fns ...func(*Config) f := NewFactory() cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { cfg.Endpoints = []string{url} - cfg.NumWorkers = 1 - cfg.Flush.Interval = 10 * time.Millisecond + cfg.QueueBatchConfig.NumConsumers = 1 + // Batch is defined as default configuration + cfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Millisecond }}, fns...)...) require.NoError(t, xconfmap.Validate(cfg)) exp, err := f.CreateLogs(t.Context(), exportertest.NewNopSettings(metadata.Type), cfg) @@ -2677,7 +2704,9 @@ func mustSendLogRecords(t *testing.T, exporter exporter.Logs, records ...plog.Lo } func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) { - logs.MarkReadOnly() + if !exporter.Capabilities().MutatesData { + logs.MarkReadOnly() + } err := exporter.ConsumeLogs(t.Context(), logs) require.NoError(t, err) } @@ -2685,10 +2714,10 @@ func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) { func mustSendMetricSumDataPoints(t *testing.T, exporter exporter.Metrics, dataPoints ...pmetric.NumberDataPoint) { metrics := pmetric.NewMetrics() scopeMetrics := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() + metric := scopeMetrics.Metrics().AppendEmpty() + metric.SetEmptySum() + metric.SetName("sum") for _, dataPoint := range dataPoints { - metric := scopeMetrics.Metrics().AppendEmpty() - metric.SetEmptySum() - metric.SetName("sum") dataPoint.CopyTo(metric.Sum().DataPoints().AppendEmpty()) } mustSendMetrics(t, exporter, metrics) @@ -2697,17 +2726,19 @@ func mustSendMetricSumDataPoints(t *testing.T, exporter exporter.Metrics, dataPo func mustSendMetricGaugeDataPoints(t *testing.T, exporter exporter.Metrics, dataPoints ...pmetric.NumberDataPoint) { metrics := pmetric.NewMetrics() scopeMetrics := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() + metric := scopeMetrics.Metrics().AppendEmpty() + metric.SetEmptyGauge() + metric.SetName("gauge") for _, dataPoint := range dataPoints { - metric := scopeMetrics.Metrics().AppendEmpty() - metric.SetEmptyGauge() - metric.SetName("gauge") dataPoint.CopyTo(metric.Gauge().DataPoints().AppendEmpty()) } mustSendMetrics(t, exporter, metrics) } func mustSendMetrics(t *testing.T, exporter exporter.Metrics, metrics pmetric.Metrics) { - metrics.MarkReadOnly() + if !exporter.Capabilities().MutatesData { + metrics.MarkReadOnly() + } err := exporter.ConsumeMetrics(t.Context(), metrics) require.NoError(t, err) } @@ -2723,7 +2754,9 @@ func mustSendSpans(t *testing.T, exporter exporter.Traces, spans ...ptrace.Span) } func mustSendTraces(t *testing.T, exporter exporter.Traces, traces ptrace.Traces) { - traces.MarkReadOnly() + if !exporter.Capabilities().MutatesData { + traces.MarkReadOnly() + } err := exporter.ConsumeTraces(t.Context(), traces) require.NoError(t, err) } diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 1a46365a1816b..4a9937e12bde2 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -17,7 +17,6 @@ import ( "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configoptional" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper" @@ -41,8 +40,18 @@ func NewFactory() exporter.Factory { } func createDefaultConfig() component.Config { + // TODO(lahsivjar): This is deviating from the original defaults: + // - block_on_overflow: by default this is set to `false` i.e. clients will get + // retryable error when queue is full. However, the original behaviour is + // that we will block until a consumer is free ([ref](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/58308d77fa47e74bd8ed402ef4cd944cc2a4126a/exporter/elasticsearchexporter/bulkindexer.go#L325-L329)) qs := exporterhelper.NewDefaultQueueConfig() - qs.Enabled = false + qs.QueueSize = 10 + qs.Batch = configoptional.Some(exporterhelper.BatchConfig{ + FlushTimeout: 10 * time.Second, + MinSize: 5e+6, + MaxSize: 10e+6, + Sizer: exporterhelper.RequestSizerTypeBytes, + }) httpClientConfig := confighttp.NewDefaultClientConfig() httpClientConfig.Timeout = 90 * time.Second @@ -83,15 +92,6 @@ func createDefaultConfig() component.Config { LogFailedDocsInputRateLimit: time.Second, }, IncludeSourceOnError: nil, - Batcher: BatcherConfig{ - FlushTimeout: 10 * time.Second, - Sizer: exporterhelper.RequestSizerTypeItems, - MinSize: defaultBatcherMinSizeItems, - }, - Flush: FlushSettings{ - Bytes: 5e+6, - Interval: 10 * time.Second, - }, } } @@ -220,53 +220,21 @@ func exporterhelperOptions( shutdown component.ShutdownFunc, qbs exporterhelper.QueueBatchSettings, ) []exporterhelper.Option { + // not setting capabilities as they will default to non-mutating but will be updated + // by the base-exporter to mutating if batching is enabled. opts := []exporterhelper.Option{ - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithStart(start), exporterhelper.WithShutdown(shutdown), + exporterhelper.WithQueueBatch(cfg.QueueBatchConfig, qbs), } - qbc := cfg.QueueBatchConfig - switch { - case qbc.Batch.HasValue(): - // Latest queue batch settings are used, prioritize them even if sending queue is disabled - opts = append(opts, exporterhelper.WithQueueBatch(qbc, qbs)) - - // Effectively disable timeout_sender because timeout is enforced in bulk indexer. - // - // We keep timeout_sender enabled in the async mode (sending_queue not enabled OR sending - // queue enabled but batching not enabled OR based on the deprecated batcher setting), to - // ensure sending data to the background workers will not block indefinitely. - if qbc.Enabled { - opts = append(opts, exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0})) - } - case cfg.Batcher.enabledSet: - if cfg.Batcher.Enabled { - qbc.Batch = configoptional.Some(exporterhelper.BatchConfig{ - FlushTimeout: cfg.Batcher.FlushTimeout, - MinSize: cfg.Batcher.MinSize, - MaxSize: cfg.Batcher.MaxSize, - Sizer: cfg.Batcher.Sizer, - }) - - // If the deprecated batcher is enabled without a queue, enable blocking queue to replicate the - // behavior of the deprecated batcher. - if !qbc.Enabled { - qbc.Enabled = true - qbc.WaitForResult = true - } - } - opts = append( - opts, - // Effectively disable timeout_sender because timeout is enforced in bulk indexer. - // - // We keep timeout_sender enabled in the async mode (Batcher.Enabled == nil), - // to ensure sending data to the background workers will not block indefinitely. - exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), - exporterhelper.WithQueue(qbc), - ) - default: - opts = append(opts, exporterhelper.WithQueue(qbc)) + // Effectively disable timeout_sender because timeout is enforced in bulk indexer. + // + // We keep timeout_sender enabled in the async mode (sending_queue not enabled OR sending + // queue enabled but batching not enabled OR based on the deprecated batcher setting), to + // ensure sending data to the background workers will not block indefinitely. + if cfg.QueueBatchConfig.Enabled { + opts = append(opts, exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0})) } return opts } diff --git a/exporter/elasticsearchexporter/factory_test.go b/exporter/elasticsearchexporter/factory_test.go index 29dd9d1293d1e..70625599fb1b8 100644 --- a/exporter/elasticsearchexporter/factory_test.go +++ b/exporter/elasticsearchexporter/factory_test.go @@ -116,17 +116,6 @@ var signalTestCases = []struct { "endpoints": []string{"http://test:9200"}, }, }, - { - name: "with_deprecated_batcher", - cfg: map[string]any{ - "batcher": map[string]any{ - "enabled": true, - }, - }, - expectedLogs: []string{ - "batcher has been deprecated", - }, - }, { name: "with_sending_queue", cfg: map[string]any{ @@ -136,33 +125,4 @@ var signalTestCases = []struct { }, }, }, - { - name: "with_sending_queue_disabled_and_deprecated_batcher", - cfg: map[string]any{ - "sending_queue": map[string]any{ - "batch": map[string]any{}, - }, - "batcher": map[string]any{ - "enabled": true, - }, - }, - expectedLogs: []string{ - "sending_queue::batch will take preference", - }, - }, - { - name: "with_sending_queue_enabled_and_deprecated_batcher", - cfg: map[string]any{ - "sending_queue": map[string]any{ - "enabled": true, - "batch": map[string]any{}, - }, - "batcher": map[string]any{ - "enabled": true, - }, - }, - expectedLogs: []string{ - "sending_queue::batch will take preference", - }, - }, } diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index 9235f670ec048..aba78bac568d4 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -20,8 +20,6 @@ elasticsearch/trace: api_key: AvFsEiPs== discover: on_start: true - flush: - bytes: 10485760 retry: max_retries: 5 retry_on_status: @@ -47,15 +45,11 @@ elasticsearch/metric: api_key: AvFsEiPs== discover: on_start: true - flush: - bytes: 10485760 retry: max_retries: 5 retry_on_status: - 429 - 500 - sending_queue: - enabled: true elasticsearch/log: tls: insecure: false @@ -76,15 +70,11 @@ elasticsearch/log: api_key: AvFsEiPs== discover: on_start: true - flush: - bytes: 10485760 retry: max_retries: 5 retry_on_status: - 429 - 500 - sending_queue: - enabled: true elasticsearch/logstash_format: endpoints: [http://localhost:9200] logstash_format: @@ -97,21 +87,12 @@ elasticsearch/cloudid: cloudid: foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY= elasticsearch/confighttp_endpoint: endpoint: https://elastic.example.com:9200 -elasticsearch/batcher_disabled: - endpoint: https://elastic.example.com:9200 - batcher: - enabled: false elasticsearch/compression_none: endpoint: https://elastic.example.com:9200 compression: none elasticsearch/compression_gzip: endpoint: https://elastic.example.com:9200 compression: gzip -elasticsearch/batcher_minmax_size: - endpoint: https://elastic.example.com:9200 - batcher: - min_size: 100 - max_size: 200 elasticsearch/include_source_on_error: endpoint: https://elastic.example.com:9200 include_source_on_error: true @@ -120,11 +101,22 @@ elasticsearch/metadata_keys: metadata_keys: - x-test-1 - x-test-2 -elasticsearch/queuebatch_enabled: +elasticsearch/sendingqueue_disabled: + endpoint: https://elastic.example.com:9200 + # Setting sending queue without `batch::sizer` can have unintended + # side effects. See https://github.com/open-telemetry/opentelemetry-collector/issues/13860 + sending_queue: + enabled: false + batch: + sizer: bytes +elasticsearch/sendingqueue_enabled: endpoint: https://elastic.example.com:9200 sending_queue: enabled: true - sizer: "items" + sizer: requests num_consumers: 100 batch: - flush_timeout: 10s + flush_timeout: 1s + sizer: items + min_size: 1000 + max_size: 5000 From 459afaebb251f4349ba962b96eaecb9d472ee56e Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 18 Sep 2025 13:43:14 +0100 Subject: [PATCH 02/29] update readme --- exporter/elasticsearchexporter/README.md | 58 ++++++------------------ 1 file changed, 15 insertions(+), 43 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 5829a22c7a5e0..5e4fc6d6f02a7 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -82,52 +82,24 @@ All other defaults are as defined by [confighttp]. ### Queuing and batching -The exporter is transitioning from its own internal batching to OpenTelemetry's standard -queueing and batching. The below sections describe the current default and the latest -configuration option for queueing and batching available via the `sending_queue` configuration. - -#### Internal batching by Elasticsearch exporter - -By default, the exporter will perform its own buffering and batching, as configured through the -`flush` config. In this case both `sending_queue` and `batcher` will be unused. The exporter -will perform its own buffering and batching and will issue async requests to Elasticsearch in -all cases other than if any of the following conditions are met: - -- `sending_queue::batch` is defined (irrespective of `sending_queue` being enabled or not) -- `batcher::enabled` is defined (set to `true` or `false`) - -In a future release when the `sending_queue` config is stable, and has feature parity -with the exporter's existing `flush` config, it will be enabled by default. - -Using the `sending_queue` functionality provides several benefits over the default behavior: - - With a persistent queue, or no queue at all, `sending_queue` enables at least once delivery. - On the other hand, with the default behavior, the exporter will accept data and process it - asynchronously, which interacts poorly with queueing. - - By ensuring the exporter makes requests to Elasticsearch synchronously (batching disabled), - client metadata can be passed through to Elasticsearch requests, - e.g. by using the [`headers_setter` extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/extension/headerssetterextension/README.md). - -#### Queueing and batching using sending queue - The Elasticsearch exporter supports the common [`sending_queue` settings][exporterhelper] which -supports both queueing and batching. However, the sending queue is currently disabled by -default. Sending queue can be enabled by setting `sending_queue::enabled` to `true`. The batching support in sending queue is also disabled by default. Batching can be enabled by defining `sending_queue::batch`. +supports both queueing and batching. The default sending queue is configured to do async batching +with the following configuration: -The [`exporterhelper` documentation][exporterhelper] provides more details on the `sending_queue` settings. - -#### Deprecated batcher config - -> [!WARNING] -> The `batcher` config is now deprecated and will be removed in an upcoming version. Check the [queueing and batching](#queueing-and-batching) section for using the `sending_queue` setting that supersedes `batcher`. In the interim, `batcher` configurations are still valid, however, they will be ignored if `sending_queue::batch` is defined even if `sending_queue` is not enabled. - -The Elasticsearch exporter supports the [common `batcher` settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/internal/queue_sender.go). +```yaml +sending_queue: + enabled: true + sizer: requests + num_consumers: 10 + queue_size: 10 + batch: + flush_timeout: 10s + min_size: 5e+6 // 5MB + max_size: 10e+6 // 10MB + sizer: bytes +``` -- `batcher`: - - `enabled` (default=unset): Enable batching of requests into 1 or more bulk requests. On a batcher flush, it is possible for a batched request to be translated to more than 1 bulk request due to `flush::bytes`. - - `sizer` (default=items): Unit of `min_size` and `max_size`. Currently supports only "items", in the future will also support "bytes". - - `min_size` (default=5000): Minimum batch size to be exported to Elasticsearch, measured in units according to `batcher::sizer`. - - `max_size` (default=0): Maximum batch size to be exported to Elasticsearch, measured in units according to `batcher::sizer`. To limit bulk request size, configure `flush::bytes` instead. :warning: It is recommended to keep `max_size` as 0 as a non-zero value may lead to broken metrics grouping and indexing rejections. - - `flush_timeout` (default=10s): Maximum time of the oldest item spent inside the batcher buffer, aka "max age of batcher buffer". A batcher flush will happen regardless of the size of content in batcher buffer. +The default configurations are chosen to be closer to the previous defaults with the exporter's inbuilt batching. The [`exporterhelper` documentation][exporterhelper] provides more details on the `sending_queue` settings. ### Elasticsearch document routing From 6c6b9c58ebe223d53908018c9662173805a8b979 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 18 Sep 2025 14:00:22 +0100 Subject: [PATCH 03/29] Add changelog --- .chloggen/esexporter-remove-batcher.yaml | 27 ++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/esexporter-remove-batcher.yaml diff --git a/.chloggen/esexporter-remove-batcher.yaml b/.chloggen/esexporter-remove-batcher.yaml new file mode 100644 index 0000000000000..360c94261f4e8 --- /dev/null +++ b/.chloggen/esexporter-remove-batcher.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Remove batcher and related config in favor of sending queue + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [42718] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Previously deprecated `batcher` configuration is removed. `num_consumers` and `flush` are now deprecated and ignored if defined as they conflict with `sending_queue` configurations. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From 44aa7200fefc25edee9414105b682d1047d25343 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Fri, 19 Sep 2025 11:35:06 +0100 Subject: [PATCH 04/29] fix lint --- exporter/elasticsearchexporter/bulkindexer.go | 30 +++++-------------- .../elasticsearchexporter/bulkindexer_test.go | 3 +- .../elasticsearchexporter/exporter_test.go | 6 ++-- exporter/elasticsearchexporter/factory.go | 4 +-- 4 files changed, 12 insertions(+), 31 deletions(-) diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 85b2dc8d8cd30..c9f5c04156d45 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -70,8 +70,8 @@ func newBulkIndexer( requireDataStream bool, tb *metadata.TelemetryBuilder, logger *zap.Logger, -) (bulkIndexer, error) { - return newSyncBulkIndexer(client, config, requireDataStream, tb, logger), nil +) bulkIndexer { + return newSyncBulkIndexer(client, config, requireDataStream, tb, logger) } func bulkIndexerConfig(client esapi.Transport, config *Config, requireDataStream bool) docappender.BulkIndexerConfig { @@ -492,36 +492,20 @@ func (b *bulkIndexers) start( } for _, mode := range allowedMappingModes { - var bi bulkIndexer - bi, err = newBulkIndexer(esClient, cfg, mode == MappingOTel, b.telemetryBuilder, set.Logger) - if err != nil { - return err - } + bi := newBulkIndexer(esClient, cfg, mode == MappingOTel, b.telemetryBuilder, set.Logger) b.modes[mode] = &wgTrackingBulkIndexer{bulkIndexer: bi, wg: &b.wg} } - profilingEvents, err := newBulkIndexer(esClient, cfg, true, b.telemetryBuilder, set.Logger) - if err != nil { - return err - } + profilingEvents := newBulkIndexer(esClient, cfg, true, b.telemetryBuilder, set.Logger) b.profilingEvents = &wgTrackingBulkIndexer{bulkIndexer: profilingEvents, wg: &b.wg} - profilingStackTraces, err := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) - if err != nil { - return err - } + profilingStackTraces := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) b.profilingStackTraces = &wgTrackingBulkIndexer{bulkIndexer: profilingStackTraces, wg: &b.wg} - profilingStackFrames, err := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) - if err != nil { - return err - } + profilingStackFrames := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) b.profilingStackFrames = &wgTrackingBulkIndexer{bulkIndexer: profilingStackFrames, wg: &b.wg} - profilingExecutables, err := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) - if err != nil { - return err - } + profilingExecutables := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) b.profilingExecutables = &wgTrackingBulkIndexer{bulkIndexer: profilingExecutables, wg: &b.wg} return nil } diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index 58f28eec89a0a..bba013bdeb6fa 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -191,7 +191,6 @@ func TestNewBulkIndexer(t *testing.T) { require.NoError(t, err) cfg := createDefaultConfig() - bi, err := newBulkIndexer(client, cfg.(*Config), true, nil, nil) - require.NoError(t, err) + bi := newBulkIndexer(client, cfg.(*Config), true, nil, nil) t.Cleanup(func() { bi.Close(t.Context()) }) } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 620cbe6fed897..915ca60bf072c 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -2614,7 +2614,7 @@ func newTestTracesExporter(t *testing.T, url string, fns ...func(*Config)) expor cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { cfg.Endpoints = []string{url} cfg.QueueBatchConfig.NumConsumers = 1 - // Batch is configured by default so we can directy edit flush timeout + // Batch is configured by default so we can directly edit flush timeout cfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Millisecond }}, fns...)...) require.NoError(t, xconfmap.Validate(cfg)) @@ -2634,7 +2634,7 @@ func newTestProfilesExporter(t *testing.T, url string, fns ...func(*Config)) xex cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { cfg.Endpoints = []string{url} cfg.QueueBatchConfig.NumConsumers = 1 - // Batch is configured by default so we can directy edit flush timeout + // Batch is configured by default so we can directly edit flush timeout cfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Millisecond }}, fns...)...) require.NoError(t, xconfmap.Validate(cfg)) @@ -2654,7 +2654,7 @@ func newTestMetricsExporter(t *testing.T, url string, fns ...func(*Config)) expo cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { cfg.Endpoints = []string{url} cfg.QueueBatchConfig.NumConsumers = 1 - // Batch is configured by default so we can directy edit flush timeout + // Batch is configured by default so we can directly edit flush timeout cfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Millisecond }}, fns...)...) require.NoError(t, xconfmap.Validate(cfg)) diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 4a9937e12bde2..6a92d7b98ba1a 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -25,8 +25,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata" ) -var defaultBatcherMinSizeItems = int64(5000) - // NewFactory creates a factory for Elastic exporter. func NewFactory() exporter.Factory { return xexporter.NewFactory( @@ -42,7 +40,7 @@ func NewFactory() exporter.Factory { func createDefaultConfig() component.Config { // TODO(lahsivjar): This is deviating from the original defaults: // - block_on_overflow: by default this is set to `false` i.e. clients will get - // retryable error when queue is full. However, the original behaviour is + // retryable error when queue is full. However, the original behavior is // that we will block until a consumer is free ([ref](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/58308d77fa47e74bd8ed402ef4cd944cc2a4126a/exporter/elasticsearchexporter/bulkindexer.go#L325-L329)) qs := exporterhelper.NewDefaultQueueConfig() qs.QueueSize = 10 From c88d9c63de422cce1ff6c4259f7b2916e75e0f71 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Fri, 19 Sep 2025 11:45:41 +0100 Subject: [PATCH 05/29] Fix lint in integration tests --- .../integrationtest/exporter_bench_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 4842cefa6bc84..0ad8dcdef41de 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -171,8 +171,9 @@ func prepareBenchmark( cfg.esCfg.LogsIndex = TestLogsIndex cfg.esCfg.MetricsIndex = TestMetricsIndex cfg.esCfg.TracesIndex = TestTracesIndex - cfg.esCfg.Flush.Interval = 10 * time.Millisecond - cfg.esCfg.NumWorkers = 1 + // sending_queue::batch is defined as a default config + cfg.esCfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Second + cfg.esCfg.QueueBatchConfig.NumConsumers = 1 tc, err := consumer.NewTraces(func(context.Context, ptrace.Traces) error { return nil From 1465da8fde11802ba7d054fbc81eb7c378ae3969 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 23 Sep 2025 15:55:01 +0100 Subject: [PATCH 06/29] Update exporter/elasticsearchexporter/README.md Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 5e4fc6d6f02a7..6e460ccbcac49 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -99,7 +99,7 @@ sending_queue: sizer: bytes ``` -The default configurations are chosen to be closer to the previous defaults with the exporter's inbuilt batching. The [`exporterhelper` documentation][exporterhelper] provides more details on the `sending_queue` settings. +The default configurations are chosen to be closer to the defaults with the exporter's previous inbuilt batching feature. The [`exporterhelper` documentation][exporterhelper] provides more details on the `sending_queue` settings. ### Elasticsearch document routing From 5e6e752a2cddab7434a8165bdaf6e737d90d18f2 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 23 Sep 2025 17:51:29 +0100 Subject: [PATCH 07/29] Update readme and change batching defaults --- exporter/elasticsearchexporter/README.md | 23 +++++++++++++------ exporter/elasticsearchexporter/config_test.go | 12 +++++----- exporter/elasticsearchexporter/factory.go | 18 ++++----------- 3 files changed, 27 insertions(+), 26 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 6e460ccbcac49..e0b5f687e5987 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -94,8 +94,8 @@ sending_queue: queue_size: 10 batch: flush_timeout: 10s - min_size: 5e+6 // 5MB - max_size: 10e+6 // 10MB + min_size: 1e+6 // 1MB + max_size: 5e+6 // 5MB sizer: bytes ``` @@ -292,8 +292,8 @@ This can be configured through the following settings: The Elasticsearch exporter uses the [Elasticsearch Bulk API] for indexing documents. The behaviour of this bulk indexing can be configured with the following settings: -- `num_workers` (default=runtime.NumCPU()): Number of workers publishing bulk requests concurrently. Note this is not applicable if `batcher::enabled` is `true` or `false`. -- `flush`: Event bulk indexer buffer flush settings +- `num_workers` (default=runtime.NumCPU()): This config is deprecated now and will be ignored, use the `sending_queue` config. Number of workers publishing bulk requests concurrently. Note this is not applicable if `batcher::enabled` is `true` or `false`. +- `flush`: This config is deprecated now and will be ignored, use `sending_queue` config. Event bulk indexer buffer flush settings - `bytes` (default=5000000): Write buffer flush size limit before compression. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. - `interval` (default=10s): Write buffer flush time limit. - `retry`: Elasticsearch bulk request retry settings @@ -303,9 +303,18 @@ The behaviour of this bulk indexing can be configured with the following setting - `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed. - `max_interval` (default=1m): Max waiting time if a HTTP request failed. - `retry_on_status` (default=[429]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it defaults to `[429]`. - -> [!NOTE] -> The `flush::interval` config will be ignored when `batcher::enabled` config is explicitly set to `true` or `false`. +- `sending_queue`: Configures the queueing and batching behaviour. Below are the defaults (which may vary from standard defaults), for full configuration check the [exporterheler docs][exporterhelper]. + - `enabled` (default=true): Enable queueing and batching behaviour. + - `num_consumers` (default=10): Number of consumers that dequeue batches. + - `wait_for_result` (default=false): If `true`, blocks incoming requests until processed. + - `block_on_overflow` (default=false): If `true`, blocks the request until the queue has space. + - `sizer` (default=requests): Measure queueing by requests. + - `queue_size` (default=10): Maximum size the queue can accept. + - `batch`: + - `flush_timeout` (default=10s): Time after which batch is exported irrespective of other settings. + - `sizer` (default=bytes): Size batches by bytes. + - `min_size` (default=1MB): Min size of the batch. + - `max_size` (default=5MB): Max size of the batch. #### Bulk indexing error response diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 5cf87808f50bf..54cc04215db8d 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -65,8 +65,8 @@ func TestConfig(t *testing.T) { Batch: configoptional.Some(exporterhelper.BatchConfig{ FlushTimeout: 10 * time.Second, Sizer: exporterhelper.RequestSizerTypeBytes, - MinSize: 5000000, - MaxSize: 10000000, + MinSize: 1000000, + MaxSize: 5000000, }), }, Endpoints: []string{ @@ -148,8 +148,8 @@ func TestConfig(t *testing.T) { Batch: configoptional.Some(exporterhelper.BatchConfig{ FlushTimeout: 10 * time.Second, Sizer: exporterhelper.RequestSizerTypeBytes, - MinSize: 5000000, - MaxSize: 10000000, + MinSize: 1000000, + MaxSize: 5000000, }), }, Endpoints: []string{"http://localhost:9200"}, @@ -221,8 +221,8 @@ func TestConfig(t *testing.T) { Batch: configoptional.Some(exporterhelper.BatchConfig{ FlushTimeout: 10 * time.Second, Sizer: exporterhelper.RequestSizerTypeBytes, - MinSize: 5000000, - MaxSize: 10000000, + MinSize: 1000000, + MaxSize: 5000000, }), }, Endpoints: []string{"http://localhost:9200"}, diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 6a92d7b98ba1a..47634de1495ad 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -46,8 +46,8 @@ func createDefaultConfig() component.Config { qs.QueueSize = 10 qs.Batch = configoptional.Some(exporterhelper.BatchConfig{ FlushTimeout: 10 * time.Second, - MinSize: 5e+6, - MaxSize: 10e+6, + MinSize: 1e+6, + MaxSize: 5e+6, Sizer: exporterhelper.RequestSizerTypeBytes, }) @@ -220,19 +220,11 @@ func exporterhelperOptions( ) []exporterhelper.Option { // not setting capabilities as they will default to non-mutating but will be updated // by the base-exporter to mutating if batching is enabled. - opts := []exporterhelper.Option{ + return []exporterhelper.Option{ exporterhelper.WithStart(start), exporterhelper.WithShutdown(shutdown), exporterhelper.WithQueueBatch(cfg.QueueBatchConfig, qbs), + // Effectively disable timeout_sender because timeout is enforced in bulk indexer. + exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), } - - // Effectively disable timeout_sender because timeout is enforced in bulk indexer. - // - // We keep timeout_sender enabled in the async mode (sending_queue not enabled OR sending - // queue enabled but batching not enabled OR based on the deprecated batcher setting), to - // ensure sending data to the background workers will not block indefinitely. - if cfg.QueueBatchConfig.Enabled { - opts = append(opts, exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0})) - } - return opts } From 3bb6e62ef89b22ff5eba55934ab5633aa8b01886 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 23 Sep 2025 18:00:52 +0100 Subject: [PATCH 08/29] Add message for deprecated config --- exporter/elasticsearchexporter/config.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index f4533b62ca7e5..53cd3b5437b77 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -476,6 +476,12 @@ func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) { if cfg.TracesDynamicIndex.Enabled { logger.Warn("traces_dynamic_index::enabled has been deprecated, and will be removed in a future version. It is now a no-op. Dynamic document routing is now the default. See Elasticsearch Exporter README.") } + if cfg.Flush.Bytes > 0 || cfg.Flush.Interval > 0 { + logger.Warn("flush settings are now deprecated and ignored. Please use `sending_queue` instead.") + } + if cfg.NumWorkers > 0 { + logger.Warn("`num_workers` are now deprecated and ignored. Please use `sending_queue` instead.") + } } func handleTelemetryConfig(cfg *Config, logger *zap.Logger) { From fa5bd512f408b7c03d02636ebc04f4139a565063 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 23 Sep 2025 18:09:35 +0100 Subject: [PATCH 09/29] go mod tidy --- exporter/elasticsearchexporter/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index 53c2e6d9bdb5c..b0ae385f0fd8a 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -26,7 +26,6 @@ require ( go.opentelemetry.io/collector/config/configoptional v0.136.0 go.opentelemetry.io/collector/confmap v1.42.0 go.opentelemetry.io/collector/confmap/xconfmap v0.136.0 - go.opentelemetry.io/collector/consumer v1.42.0 go.opentelemetry.io/collector/consumer/consumererror v0.136.0 go.opentelemetry.io/collector/exporter v1.42.0 go.opentelemetry.io/collector/exporter/exporterhelper v0.136.0 @@ -88,6 +87,7 @@ require ( go.opentelemetry.io/collector/config/configmiddleware v1.42.0 // indirect go.opentelemetry.io/collector/config/configretry v1.42.0 // indirect go.opentelemetry.io/collector/config/configtls v1.42.0 // indirect + go.opentelemetry.io/collector/consumer v1.42.0 // indirect go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.136.0 // indirect go.opentelemetry.io/collector/consumer/consumertest v0.136.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.136.0 // indirect From f9a5f30023cb20157828a5a9a9726c4b8c9f5284 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 23 Sep 2025 18:35:12 +0100 Subject: [PATCH 10/29] make lint --- exporter/elasticsearchexporter/factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 58f3fd8b1862d..c122bd5d34bee 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -223,7 +223,7 @@ func exporterhelperOptions( return []exporterhelper.Option{ exporterhelper.WithStart(start), exporterhelper.WithShutdown(shutdown), - exporterhelper.WithQueueBatch(cfg.QueueBatchConfig, qbs), + xexporterhelper.WithQueueBatch(cfg.QueueBatchConfig, qbs), // Effectively disable timeout_sender because timeout is enforced in bulk indexer. exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), } From 1aec3c7ac9dd1814cc2060d441d23a435137702b Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 23 Sep 2025 20:26:14 +0100 Subject: [PATCH 11/29] Fix integration tests --- .../integrationtest/datareceiver.go | 34 +++++++++++-------- .../integrationtest/exporter_test.go | 28 +++++---------- 2 files changed, 27 insertions(+), 35 deletions(-) diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index e186f8175faa6..89cbe02933edf 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -67,7 +67,7 @@ type esDataReceiver struct { receiver receiver.Logs endpoint string decodeBulkRequest bool - batcherEnabled *bool + enableBatching bool t testing.TB } @@ -92,9 +92,9 @@ func withDecodeBulkRequest(decode bool) dataReceiverOption { } } -func withBatcherEnabled(enabled bool) dataReceiverOption { +func withBatching(enabled bool) dataReceiverOption { return func(r *esDataReceiver) { - r.batcherEnabled = &enabled + r.enableBatching = enabled } } @@ -153,9 +153,6 @@ func (es *esDataReceiver) GenConfigYAMLStr() string { logs_index: %s metrics_index: %s traces_index: %s - sending_queue: - enabled: true - block_on_overflow: true mapping: mode: otel retry: @@ -169,17 +166,24 @@ func (es *esDataReceiver) GenConfigYAMLStr() string { es.endpoint, TestLogsIndex, TestMetricsIndex, TestTracesIndex, ) - if es.batcherEnabled == nil { + if es.enableBatching { cfgFormat += ` - flush: - interval: 1s` + sending_queue: + enabled: true + block_on_overflow: true + batch: + flush_timeout: 1s + sizer: bytes` } else { - cfgFormat += fmt.Sprintf(` - batcher: - flush_timeout: 1s - enabled: %v`, - *es.batcherEnabled, - ) + // Batching is disabled using `min_size` as we are setting batching + // as a default behavior. + cfgFormat += ` + sending_queue: + enabled: true + block_on_overflow: true + batch: + min_size: 0 + sizer: bytes` } return cfgFormat + "\n" } diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_test.go index 5a6bf2767b078..52a6e1f93cddc 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_test.go @@ -21,11 +21,7 @@ func TestExporter(t *testing.T) { for _, tc := range []struct { name string - // batcherEnabled enables/disables the batch sender. If this is - // nil, then the exporter buffers data itself (legacy behavior), - // whereas if it is non-nil then the exporter will not perform - // any buffering itself. - batcherEnabled *bool + enableBatching bool // restartCollector restarts the OTEL collector. Restarting // the collector allows durability testing of the ES exporter @@ -37,12 +33,12 @@ func TestExporter(t *testing.T) { {name: "es_intermittent_http_error", mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}}, {name: "es_intermittent_doc_error", mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}}, - {name: "batcher_enabled", batcherEnabled: ptrTo(true)}, - {name: "batcher_enabled_es_intermittent_http_error", batcherEnabled: ptrTo(true), mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}}, - {name: "batcher_enabled_es_intermittent_doc_error", batcherEnabled: ptrTo(true), mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}}, - {name: "batcher_disabled", batcherEnabled: ptrTo(false)}, - {name: "batcher_disabled_es_intermittent_http_error", batcherEnabled: ptrTo(false), mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}}, - {name: "batcher_disabled_es_intermittent_doc_error", batcherEnabled: ptrTo(false), mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}}, + {name: "enable sending_queue batching", enableBatching: true}, + {name: "batcher_enabled_es_intermittent_http_error", enableBatching: true, mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}}, + {name: "batcher_enabled_es_intermittent_doc_error", enableBatching: true, mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}}, + {name: "batcher_disabled", enableBatching: false}, + {name: "batcher_disabled_es_intermittent_http_error", enableBatching: false, mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}}, + {name: "batcher_disabled_es_intermittent_doc_error", enableBatching: false, mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}}, /* TODO: Below tests should be enabled after https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30792 is fixed {name: "collector_restarts", restartCollector: true}, @@ -50,11 +46,7 @@ func TestExporter(t *testing.T) { */ } { t.Run(fmt.Sprintf("%s/%s", eventType, tc.name), func(t *testing.T) { - var opts []dataReceiverOption - if tc.batcherEnabled != nil { - opts = append(opts, withBatcherEnabled(*tc.batcherEnabled)) - } - runner(t, eventType, tc.restartCollector, tc.mockESErr, opts...) + runner(t, eventType, tc.restartCollector, tc.mockESErr, withBatching(tc.enableBatching)) }) } } @@ -145,7 +137,3 @@ func runner(t *testing.T, eventType string, restartCollector bool, mockESErr err ) tc.ValidateData() } - -func ptrTo[T any](t T) *T { - return &t -} From eaaa5c30a259b565e8e0715564a992efe1a872ba Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 24 Sep 2025 11:32:27 +0100 Subject: [PATCH 12/29] Update exporter/elasticsearchexporter/README.md Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index e0b5f687e5987..a396e01c57049 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -292,7 +292,7 @@ This can be configured through the following settings: The Elasticsearch exporter uses the [Elasticsearch Bulk API] for indexing documents. The behaviour of this bulk indexing can be configured with the following settings: -- `num_workers` (default=runtime.NumCPU()): This config is deprecated now and will be ignored, use the `sending_queue` config. Number of workers publishing bulk requests concurrently. Note this is not applicable if `batcher::enabled` is `true` or `false`. +- `num_workers` (DEPRECATED, use sending_queue::num_consumers instead): This config is deprecated and ignored. Number of workers publishing bulk requests concurrently. - `flush`: This config is deprecated now and will be ignored, use `sending_queue` config. Event bulk indexer buffer flush settings - `bytes` (default=5000000): Write buffer flush size limit before compression. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. - `interval` (default=10s): Write buffer flush time limit. From 4b51441adf5238a2628ef582616d355f1ae81ad4 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 24 Sep 2025 11:35:02 +0100 Subject: [PATCH 13/29] Update reademe --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index a396e01c57049..801ad9863bb9e 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -314,7 +314,7 @@ The behaviour of this bulk indexing can be configured with the following setting - `flush_timeout` (default=10s): Time after which batch is exported irrespective of other settings. - `sizer` (default=bytes): Size batches by bytes. - `min_size` (default=1MB): Min size of the batch. - - `max_size` (default=5MB): Max size of the batch. + - `max_size` (default=5MB): Max size of the batch. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. #### Bulk indexing error response From 4e694b938c9bd6f8e95c958ced9bf17e21068a48 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 24 Sep 2025 11:37:22 +0100 Subject: [PATCH 14/29] Update exporter/elasticsearchexporter/README.md Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 801ad9863bb9e..667fd77f1c8b8 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -293,7 +293,7 @@ The Elasticsearch exporter uses the [Elasticsearch Bulk API] for indexing docume The behaviour of this bulk indexing can be configured with the following settings: - `num_workers` (DEPRECATED, use sending_queue::num_consumers instead): This config is deprecated and ignored. Number of workers publishing bulk requests concurrently. -- `flush`: This config is deprecated now and will be ignored, use `sending_queue` config. Event bulk indexer buffer flush settings +- `flush` (DEPRECATED, use sending_queue instead): This config is deprecated and will be ignored. Event bulk indexer buffer flush settings - `bytes` (default=5000000): Write buffer flush size limit before compression. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. - `interval` (default=10s): Write buffer flush time limit. - `retry`: Elasticsearch bulk request retry settings From d5c368554c5833172250a4327908466dfab6b124 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 24 Sep 2025 11:37:30 +0100 Subject: [PATCH 15/29] Update exporter/elasticsearchexporter/README.md Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 667fd77f1c8b8..9d780bf31fee0 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -294,7 +294,7 @@ The behaviour of this bulk indexing can be configured with the following setting - `num_workers` (DEPRECATED, use sending_queue::num_consumers instead): This config is deprecated and ignored. Number of workers publishing bulk requests concurrently. - `flush` (DEPRECATED, use sending_queue instead): This config is deprecated and will be ignored. Event bulk indexer buffer flush settings - - `bytes` (default=5000000): Write buffer flush size limit before compression. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. + - `bytes`: Write buffer flush size limit before compression. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. - `interval` (default=10s): Write buffer flush time limit. - `retry`: Elasticsearch bulk request retry settings - `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff. From b93b35c2033311b1ad753f44e1c95401a546058b Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 24 Sep 2025 11:37:38 +0100 Subject: [PATCH 16/29] Update exporter/elasticsearchexporter/README.md Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 9d780bf31fee0..49c12ea06e10f 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -295,7 +295,7 @@ The behaviour of this bulk indexing can be configured with the following setting - `num_workers` (DEPRECATED, use sending_queue::num_consumers instead): This config is deprecated and ignored. Number of workers publishing bulk requests concurrently. - `flush` (DEPRECATED, use sending_queue instead): This config is deprecated and will be ignored. Event bulk indexer buffer flush settings - `bytes`: Write buffer flush size limit before compression. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. - - `interval` (default=10s): Write buffer flush time limit. + - `interval`: Write buffer flush time limit. - `retry`: Elasticsearch bulk request retry settings - `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff. - `max_requests` (DEPRECATED, use retry::max_retries instead): Number of HTTP request retries including the initial attempt. If used, `retry::max_retries` will be set to `max_requests - 1`. From 9ec3412e1c7427696b968b5769924699ef953dad Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 24 Sep 2025 11:37:47 +0100 Subject: [PATCH 17/29] Update exporter/elasticsearchexporter/config.go Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 53cd3b5437b77..a275040ba143b 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -477,7 +477,7 @@ func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) { logger.Warn("traces_dynamic_index::enabled has been deprecated, and will be removed in a future version. It is now a no-op. Dynamic document routing is now the default. See Elasticsearch Exporter README.") } if cfg.Flush.Bytes > 0 || cfg.Flush.Interval > 0 { - logger.Warn("flush settings are now deprecated and ignored. Please use `sending_queue` instead.") + logger.Warn("flush settings are now deprecated and ignored. Use `sending_queue` instead.") } if cfg.NumWorkers > 0 { logger.Warn("`num_workers` are now deprecated and ignored. Please use `sending_queue` instead.") From adcd6ce3cfb2ac302741f88901bd8f537e55e290 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 24 Sep 2025 11:37:54 +0100 Subject: [PATCH 18/29] Update exporter/elasticsearchexporter/config.go Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index a275040ba143b..620726705b0a0 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -480,7 +480,7 @@ func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) { logger.Warn("flush settings are now deprecated and ignored. Use `sending_queue` instead.") } if cfg.NumWorkers > 0 { - logger.Warn("`num_workers` are now deprecated and ignored. Please use `sending_queue` instead.") + logger.Warn("num_workers is now deprecated and ignored. Use `sending_queue` instead.") } } From 382a62369a1d7356a52c3a668610a27ecaa7144b Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 25 Sep 2025 19:21:07 +0100 Subject: [PATCH 19/29] Fix copy pasta errors --- exporter/elasticsearchexporter/exporter_test.go | 4 ---- .../integrationtest/exporter_bench_test.go | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 2b8631ca9f8a5..8c4b580398511 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -995,10 +995,6 @@ func TestExporterMetrics(t *testing.T) { t.Run("publish with success", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - for _, doc := range docs { - fmt.Println("KKK", string(doc.Action)) - fmt.Println("KKK", string(doc.Document)) - } rec.Record(docs) return itemsAllOK(docs) }) diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 0ad8dcdef41de..f47a4ccf30c48 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -172,7 +172,7 @@ func prepareBenchmark( cfg.esCfg.MetricsIndex = TestMetricsIndex cfg.esCfg.TracesIndex = TestTracesIndex // sending_queue::batch is defined as a default config - cfg.esCfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Second + cfg.esCfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Millisecond cfg.esCfg.QueueBatchConfig.NumConsumers = 1 tc, err := consumer.NewTraces(func(context.Context, ptrace.Traces) error { From 1db0f2b07338e13de23355e902ec09ad5268abac Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Fri, 26 Sep 2025 21:32:58 +0100 Subject: [PATCH 20/29] Do not ignore num_workers and flush settings --- exporter/elasticsearchexporter/README.md | 8 ++-- exporter/elasticsearchexporter/config.go | 37 ++++++++++++++++-- exporter/elasticsearchexporter/config_test.go | 38 +++++++++++++++++++ .../testdata/config.yaml | 28 ++++++++++++++ 4 files changed, 103 insertions(+), 8 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 49c12ea06e10f..23a7b9b30a9b3 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -292,10 +292,10 @@ This can be configured through the following settings: The Elasticsearch exporter uses the [Elasticsearch Bulk API] for indexing documents. The behaviour of this bulk indexing can be configured with the following settings: -- `num_workers` (DEPRECATED, use sending_queue::num_consumers instead): This config is deprecated and ignored. Number of workers publishing bulk requests concurrently. -- `flush` (DEPRECATED, use sending_queue instead): This config is deprecated and will be ignored. Event bulk indexer buffer flush settings - - `bytes`: Write buffer flush size limit before compression. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. - - `interval`: Write buffer flush time limit. +- `num_workers` (DEPRECATED, use `sending_queue::num_consumers` instead): This config is deprecated and will be used to configure `sending_queue::num_consumers` if it is not explicitly defined. Number of workers publishing bulk requests concurrently. +- `flush` (DEPRECATED, use `sending_queue` instead): This config is deprecated and will be used to configure different options for `sending_queue` if they are not explicitly defined. Event bulk indexer buffer flush settings + - `bytes` (DEPRECATED, use `sending_queue::batch::max_size` instead): This config is deprecated and will be used to configure `sending_queue::batch::max_size` if not explicitly defined. See the `sending_queue::batch::max_size` for more details. + - `interval` (DEPRECATED, use `sending_queue::batch::flush_timeout` instead): This config is deprecated and will be used to configure `sending_queue::batch::flush_timeout` if not explicitly defind. See the `sending_queue::batch::flush_timeout` for more details. - `retry`: Elasticsearch bulk request retry settings - `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff. - `max_requests` (DEPRECATED, use retry::max_retries instead): Number of HTTP request retries including the initial attempt. If used, `retry::max_retries` will be set to `max_requests - 1`. diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 620726705b0a0..677b30168c05e 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/zap" ) @@ -42,7 +43,8 @@ type Config struct { // NumWorkers configures the number of workers publishing bulk requests. // // Deprecated: [v0.136.0] This config is now deprecated. Use `sending_queue::num_consumers` - // instead. Num workers will be ignored if defined and will be dropped in future releases. + // instead. If this config is defined and `sending_queue::num_consumers` is not defined then + // it will be used to set `sending_queue::num_consumers`. NumWorkers int `mapstructure:"num_workers"` // LogsIndex configures the static index used for document routing for logs. @@ -78,7 +80,8 @@ type Config struct { Retry RetrySettings `mapstructure:"retry"` // Deprecated: [v0.136.0] This config is now deprecated. Use `sending_queue::batch` instead. - // Flush settings will be ignored if defined and will be dropped in future releases. + // If this config is defined then it will be used to configure sending queue's batch provided + // sending queue's config are not explicitly defined. Flush FlushSettings `mapstructure:"flush"` Mapping MappingsSettings `mapstructure:"mapping"` LogstashFormat LogstashFormatSettings `mapstructure:"logstash_format"` @@ -203,15 +206,22 @@ type DiscoverySettings struct { // all events already serialized into the send-buffer. // // Deprecated: [v0.136.0] This config is now deprecated. Use `sending_queue::batch` instead. -// Flush settings will be ignored if defined and will be dropped in future releases. +// If this config is defined then it will be used to configure sending queue's batch provided +// sending queue's config are not explicitly defined. type FlushSettings struct { // Bytes sets the send buffer flushing limit. - // Bytes is now deprecated. Use `sending_queue::batch::{min, max}_size with `bytes` + // Bytes is now deprecated. Use `sending_queue::batch::{min, max}_size` with `bytes` // sizer to configure batching based on bytes. + // + // If this config option is defined then it will be used to configure `sending_queue::batch::max_size` + // provided it is not explcitly defined. Bytes int `mapstructure:"bytes"` // Interval configures the max age of a document in the send buffer. // Interval is now deprecated. Use `sending-queue::batch::flush_timeout` instead. + // + // If this config option is defined then it will be used to configure `sending_queue::batch::flush_timeout` + // provided it is not explcitly defined. Interval time.Duration `mapstructure:"interval"` // prevent unkeyed literal initialization @@ -299,6 +309,25 @@ var ( const defaultElasticsearchEnvName = "ELASTICSEARCH_URL" +func (cfg *Config) Unmarshal(conf *confmap.Conf) error { + if err := conf.Unmarshal(cfg); err != nil { + return err + } + if !conf.IsSet("sending_queue::num_consumers") && conf.IsSet("num_workers") { + cfg.QueueBatchConfig.NumConsumers = cfg.NumWorkers + } + if cfg.QueueBatchConfig.Batch.HasValue() { + qbCfg := cfg.QueueBatchConfig.Batch.Get() + if !conf.IsSet("sending_queue::batch::flush_timeout") && conf.IsSet("flush::interval") { + qbCfg.FlushTimeout = cfg.Flush.Interval + } + if !conf.IsSet("sending_queue::batch::max_size") && conf.IsSet("flush::bytes") { + qbCfg.MaxSize = int64(cfg.Flush.Bytes) + } + } + return nil +} + // Validate validates the elasticsearch server configuration. func (cfg *Config) Validate() error { endpoints, err := cfg.endpoints() diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 54cc04215db8d..744316c18c146 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -368,6 +368,44 @@ func TestConfig(t *testing.T) { ) }), }, + { + id: component.NewIDWithName(metadata.Type, "backward_compat_for_deprecated_cfgs/new_config_takes_priority"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://elastic.example.com:9200" + + cfg.NumWorkers = 11 + cfg.Flush = FlushSettings{ + Bytes: 1001, + Interval: 11 * time.Second, + } + cfg.QueueBatchConfig.NumConsumers = 111 + // QueueBatchConfig is set by default + qbCfg := cfg.QueueBatchConfig.Batch.Get() + qbCfg.FlushTimeout = 111 * time.Second + qbCfg.MaxSize = 1_000_001 + qbCfg.Sizer = exporterhelper.RequestSizerTypeBytes + }), + }, + { + id: component.NewIDWithName(metadata.Type, "backward_compat_for_deprecated_cfgs/fallback_to_old_cfg"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://elastic.example.com:9200" + + cfg.NumWorkers = 11 + cfg.Flush = FlushSettings{ + Bytes: 1_000_001, + Interval: 11 * time.Second, + } + cfg.QueueBatchConfig.NumConsumers = 11 + // QueueBatchConfig is set by default + qbCfg := cfg.QueueBatchConfig.Batch.Get() + qbCfg.FlushTimeout = 11 * time.Second + qbCfg.MaxSize = 1_000_001 + qbCfg.Sizer = exporterhelper.RequestSizerTypeBytes + }), + }, } for _, tt := range tests { diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index aba78bac568d4..f79ae0c8fe556 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -120,3 +120,31 @@ elasticsearch/sendingqueue_enabled: sizer: items min_size: 1000 max_size: 5000 +elasticsearch/backward_compat_for_deprecated_cfgs/new_config_takes_priority: + endpoint: https://elastic.example.com:9200 + # Should be ignored and left as-is + num_workers: 11 + flush: + interval: 11s + bytes: 1001 + # Should take precedence + sending_queue: + enabled: true + sizer: requests + num_consumers: 111 + batch: + flush_timeout: 111s + max_size: 1_000_001 + sizer: bytes +elasticsearch/backward_compat_for_deprecated_cfgs/fallback_to_old_cfg: + endpoint: https://elastic.example.com:9200 + # Should be used to set sending_queue config + num_workers: 11 + flush: + interval: 11s + bytes: 1_000_001 + sending_queue: + enabled: true + sizer: requests + batch: + sizer: bytes From 9f3c30e3683a92a3b330a33d43fb0da9bf24cb88 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 1 Oct 2025 20:14:03 +0100 Subject: [PATCH 21/29] Update exporter/elasticsearchexporter/README.md Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 23a7b9b30a9b3..d6400cc12f0ee 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -292,7 +292,7 @@ This can be configured through the following settings: The Elasticsearch exporter uses the [Elasticsearch Bulk API] for indexing documents. The behaviour of this bulk indexing can be configured with the following settings: -- `num_workers` (DEPRECATED, use `sending_queue::num_consumers` instead): This config is deprecated and will be used to configure `sending_queue::num_consumers` if it is not explicitly defined. Number of workers publishing bulk requests concurrently. +- `num_workers` (DEPRECATED, use `sending_queue::num_consumers` instead): This config is deprecated and will be used to configure `sending_queue::num_consumers` if `sending_queue::num_consumers` is not explicitly defined. Number of workers publishing bulk requests concurrently. - `flush` (DEPRECATED, use `sending_queue` instead): This config is deprecated and will be used to configure different options for `sending_queue` if they are not explicitly defined. Event bulk indexer buffer flush settings - `bytes` (DEPRECATED, use `sending_queue::batch::max_size` instead): This config is deprecated and will be used to configure `sending_queue::batch::max_size` if not explicitly defined. See the `sending_queue::batch::max_size` for more details. - `interval` (DEPRECATED, use `sending_queue::batch::flush_timeout` instead): This config is deprecated and will be used to configure `sending_queue::batch::flush_timeout` if not explicitly defind. See the `sending_queue::batch::flush_timeout` for more details. From 19e215f2b4fb03a94dc6c879beb41a2728f3aa5b Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 1 Oct 2025 20:14:18 +0100 Subject: [PATCH 22/29] Update exporter/elasticsearchexporter/README.md Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index d6400cc12f0ee..734aa5664281f 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -293,7 +293,7 @@ The Elasticsearch exporter uses the [Elasticsearch Bulk API] for indexing docume The behaviour of this bulk indexing can be configured with the following settings: - `num_workers` (DEPRECATED, use `sending_queue::num_consumers` instead): This config is deprecated and will be used to configure `sending_queue::num_consumers` if `sending_queue::num_consumers` is not explicitly defined. Number of workers publishing bulk requests concurrently. -- `flush` (DEPRECATED, use `sending_queue` instead): This config is deprecated and will be used to configure different options for `sending_queue` if they are not explicitly defined. Event bulk indexer buffer flush settings +- `flush` (DEPRECATED, use `sending_queue` instead): This config is deprecated and will be used to configure different options for `sending_queue` if `sending_queue` options are not explicitly defined. Event bulk indexer buffer flush settings - `bytes` (DEPRECATED, use `sending_queue::batch::max_size` instead): This config is deprecated and will be used to configure `sending_queue::batch::max_size` if not explicitly defined. See the `sending_queue::batch::max_size` for more details. - `interval` (DEPRECATED, use `sending_queue::batch::flush_timeout` instead): This config is deprecated and will be used to configure `sending_queue::batch::flush_timeout` if not explicitly defind. See the `sending_queue::batch::flush_timeout` for more details. - `retry`: Elasticsearch bulk request retry settings From 6717fc3e65ba41cdf16379fa5a8088aa8ae7a9ae Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 1 Oct 2025 20:14:31 +0100 Subject: [PATCH 23/29] Update exporter/elasticsearchexporter/README.md Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 734aa5664281f..8b8fe2e99637e 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -294,7 +294,7 @@ The behaviour of this bulk indexing can be configured with the following setting - `num_workers` (DEPRECATED, use `sending_queue::num_consumers` instead): This config is deprecated and will be used to configure `sending_queue::num_consumers` if `sending_queue::num_consumers` is not explicitly defined. Number of workers publishing bulk requests concurrently. - `flush` (DEPRECATED, use `sending_queue` instead): This config is deprecated and will be used to configure different options for `sending_queue` if `sending_queue` options are not explicitly defined. Event bulk indexer buffer flush settings - - `bytes` (DEPRECATED, use `sending_queue::batch::max_size` instead): This config is deprecated and will be used to configure `sending_queue::batch::max_size` if not explicitly defined. See the `sending_queue::batch::max_size` for more details. + - `bytes` (DEPRECATED, use `sending_queue::batch::max_size` instead): This config is deprecated and will be used to configure `sending_queue::batch::max_size` if `sending_queue::batch::max_size` is not explicitly defined. See the `sending_queue::batch::max_size` for more details. - `interval` (DEPRECATED, use `sending_queue::batch::flush_timeout` instead): This config is deprecated and will be used to configure `sending_queue::batch::flush_timeout` if not explicitly defind. See the `sending_queue::batch::flush_timeout` for more details. - `retry`: Elasticsearch bulk request retry settings - `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff. From fe36c036374448036da77c855236a5d36036edcf Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 1 Oct 2025 20:14:40 +0100 Subject: [PATCH 24/29] Update exporter/elasticsearchexporter/README.md Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 8b8fe2e99637e..54ca67f17ba71 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -295,7 +295,7 @@ The behaviour of this bulk indexing can be configured with the following setting - `num_workers` (DEPRECATED, use `sending_queue::num_consumers` instead): This config is deprecated and will be used to configure `sending_queue::num_consumers` if `sending_queue::num_consumers` is not explicitly defined. Number of workers publishing bulk requests concurrently. - `flush` (DEPRECATED, use `sending_queue` instead): This config is deprecated and will be used to configure different options for `sending_queue` if `sending_queue` options are not explicitly defined. Event bulk indexer buffer flush settings - `bytes` (DEPRECATED, use `sending_queue::batch::max_size` instead): This config is deprecated and will be used to configure `sending_queue::batch::max_size` if `sending_queue::batch::max_size` is not explicitly defined. See the `sending_queue::batch::max_size` for more details. - - `interval` (DEPRECATED, use `sending_queue::batch::flush_timeout` instead): This config is deprecated and will be used to configure `sending_queue::batch::flush_timeout` if not explicitly defind. See the `sending_queue::batch::flush_timeout` for more details. + - `interval` (DEPRECATED, use `sending_queue::batch::flush_timeout` instead): This config is deprecated and will be used to configure `sending_queue::batch::flush_timeout` if `sending_queue::batch::flush_timeout` is not explicitly defined. See the `sending_queue::batch::flush_timeout` for more details. - `retry`: Elasticsearch bulk request retry settings - `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff. - `max_requests` (DEPRECATED, use retry::max_retries instead): Number of HTTP request retries including the initial attempt. If used, `retry::max_retries` will be set to `max_requests - 1`. From cd1be4f6480640b743d43c3f94bf7f2fe26771d2 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 2 Oct 2025 10:15:08 +0100 Subject: [PATCH 25/29] Account for pdata<>ndjson size discrepency --- exporter/elasticsearchexporter/README.md | 2 +- exporter/elasticsearchexporter/bulkindexer.go | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 54ca67f17ba71..4988784955697 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -312,7 +312,7 @@ The behaviour of this bulk indexing can be configured with the following setting - `queue_size` (default=10): Maximum size the queue can accept. - `batch`: - `flush_timeout` (default=10s): Time after which batch is exported irrespective of other settings. - - `sizer` (default=bytes): Size batches by bytes. + - `sizer` (default=bytes): Size batches by bytes. Note that bytes here are based on the pdata model and not on the NDJSON docs that will constitute the bulk indexer requests. To address this discrepency the bulk indexers could also flush when their size exceeds the configured max_size due to size of pdata model being smaller than their corresponding NDJSON encoding. - `min_size` (default=1MB): Min size of the batch. - `max_size` (default=5MB): Max size of the batch. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index c9f5c04156d45..885103e4bf454 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" semconv "go.opentelemetry.io/otel/semconv/v1.25.0" @@ -115,8 +116,16 @@ func newSyncBulkIndexer( tb *metadata.TelemetryBuilder, logger *zap.Logger, ) *syncBulkIndexer { + var maxFlushBytes int64 + if config.QueueBatchConfig.Batch.HasValue() { + batch := config.QueueBatchConfig.Batch.Get() + if batch.Sizer == exporterhelper.RequestSizerTypeBytes { + maxFlushBytes = batch.MaxSize + } + } return &syncBulkIndexer{ config: bulkIndexerConfig(client, config, requireDataStream), + maxFlushBytes: maxFlushBytes, flushTimeout: config.Timeout, retryConfig: config.Retry, metadataKeys: config.MetadataKeys, @@ -128,6 +137,7 @@ func newSyncBulkIndexer( type syncBulkIndexer struct { config docappender.BulkIndexerConfig + maxFlushBytes int64 flushTimeout time.Duration retryConfig RetrySettings metadataKeys []string @@ -180,6 +190,12 @@ func (s *syncBulkIndexerSession) Add(ctx context.Context, index, docID, pipeline getAttributesFromMetadataKeys(ctx, s.s.metadataKeys)...), ), ) + // sending_queue operates on flush sizes based on pdata model whereas bulk + // indexers operate on ndjson. Force a flush if the ndjson size is too large. + // when the uncompressed length exceeds the configured max flush size. + if s.s.maxFlushBytes > 0 && int64(s.bi.UncompressedLen()) >= s.s.maxFlushBytes { + return s.Flush(ctx) + } return nil } From 19af7b0cf4520a0e23fa84e22c4e3a2cf7be4495 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Fri, 3 Oct 2025 10:21:20 +0100 Subject: [PATCH 26/29] Use block on overflow as true to be the default --- exporter/elasticsearchexporter/factory.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index c122bd5d34bee..724eb965d6b5f 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -38,12 +38,9 @@ func NewFactory() exporter.Factory { } func createDefaultConfig() component.Config { - // TODO(lahsivjar): This is deviating from the original defaults: - // - block_on_overflow: by default this is set to `false` i.e. clients will get - // retryable error when queue is full. However, the original behavior is - // that we will block until a consumer is free ([ref](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/58308d77fa47e74bd8ed402ef4cd944cc2a4126a/exporter/elasticsearchexporter/bulkindexer.go#L325-L329)) qs := exporterhelper.NewDefaultQueueConfig() qs.QueueSize = 10 + qs.BlockOnOverflow = true qs.Batch = configoptional.Some(exporterhelper.BatchConfig{ FlushTimeout: 10 * time.Second, MinSize: 1e+6, From 51747c28f51fda03f478ee4a51a19cdc855ed976 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Fri, 3 Oct 2025 11:47:49 +0100 Subject: [PATCH 27/29] Fix tests for block_on_overflow set to true --- exporter/elasticsearchexporter/config_test.go | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 744316c18c146..ed0b76c17f7ad 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -58,10 +58,11 @@ func TestConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "trace"), expected: &Config{ QueueBatchConfig: exporterhelper.QueueBatchConfig{ - Enabled: true, - NumConsumers: 10, - QueueSize: 10, - Sizer: exporterhelper.RequestSizerTypeRequests, + Enabled: true, + NumConsumers: 10, + QueueSize: 10, + BlockOnOverflow: true, + Sizer: exporterhelper.RequestSizerTypeRequests, Batch: configoptional.Some(exporterhelper.BatchConfig{ FlushTimeout: 10 * time.Second, Sizer: exporterhelper.RequestSizerTypeBytes, @@ -141,10 +142,11 @@ func TestConfig(t *testing.T) { configFile: "config.yaml", expected: &Config{ QueueBatchConfig: exporterhelper.QueueBatchConfig{ - Enabled: true, - NumConsumers: 10, - QueueSize: 10, - Sizer: exporterhelper.RequestSizerTypeRequests, + Enabled: true, + NumConsumers: 10, + QueueSize: 10, + BlockOnOverflow: true, + Sizer: exporterhelper.RequestSizerTypeRequests, Batch: configoptional.Some(exporterhelper.BatchConfig{ FlushTimeout: 10 * time.Second, Sizer: exporterhelper.RequestSizerTypeBytes, @@ -214,10 +216,11 @@ func TestConfig(t *testing.T) { configFile: "config.yaml", expected: &Config{ QueueBatchConfig: exporterhelper.QueueBatchConfig{ - Enabled: true, - NumConsumers: 10, - QueueSize: 10, - Sizer: exporterhelper.RequestSizerTypeRequests, + Enabled: true, + NumConsumers: 10, + QueueSize: 10, + BlockOnOverflow: true, + Sizer: exporterhelper.RequestSizerTypeRequests, Batch: configoptional.Some(exporterhelper.BatchConfig{ FlushTimeout: 10 * time.Second, Sizer: exporterhelper.RequestSizerTypeBytes, From 37b48f5ca90305bb1ea938c79106dd5aa4b56608 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Fri, 3 Oct 2025 17:45:05 +0100 Subject: [PATCH 28/29] fix readme --- exporter/elasticsearchexporter/bulkindexer.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 885103e4bf454..39675576a8a45 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -53,13 +53,6 @@ type bulkIndexerSession interface { End() // Flush flushes any documents added to the bulk indexing session. - // - // The behavior of Flush depends on whether the bulk indexer is - // synchronous or asynchronous. Calling Flush on an asynchronous bulk - // indexer session is effectively a no-op; flushing will be done in - // the background. Calling Flush on a synchronous bulk indexer session - // will wait for bulk indexing of added documents to complete, - // successfully or not. Flush(context.Context) error } @@ -472,7 +465,7 @@ type bulkIndexers struct { // wg tracks active sessions wg sync.WaitGroup - // NOTE(axw) when we get rid of the async bulk indexer there would be + // NOTE(axw) we have removed async bulk indexer and there should be // no reason for having one per mode or for different document types. // Instead, the caller can create separate sessions as needed, and we // can either have one for required_data_stream=true and one for false, From e4462c634a89392ca9ad63ff666e6c74e5c60f6f Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 7 Oct 2025 10:07:57 +0100 Subject: [PATCH 29/29] slight change to changelog wordings --- .chloggen/esexporter-remove-batcher.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/esexporter-remove-batcher.yaml b/.chloggen/esexporter-remove-batcher.yaml index 360c94261f4e8..fc5eadd871659 100644 --- a/.chloggen/esexporter-remove-batcher.yaml +++ b/.chloggen/esexporter-remove-batcher.yaml @@ -15,7 +15,7 @@ issues: [42718] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. -subtext: Previously deprecated `batcher` configuration is removed. `num_consumers` and `flush` are now deprecated and ignored if defined as they conflict with `sending_queue` configurations. +subtext: Previously deprecated `batcher` configuration is removed. `num_consumers` and `flush` are now deprecated as they conflict with `sending_queue` configurations. # If your change doesn't affect end users or the exported elements of any package, # you should instead start your pull request title with [chore] or use the "Skip Changelog" label.