diff --git a/.chloggen/esexporter-remove-batcher.yaml b/.chloggen/esexporter-remove-batcher.yaml new file mode 100644 index 0000000000000..fc5eadd871659 --- /dev/null +++ b/.chloggen/esexporter-remove-batcher.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Remove batcher and related config in favor of sending queue + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [42718] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Previously deprecated `batcher` configuration is removed. `num_consumers` and `flush` are now deprecated as they conflict with `sending_queue` configurations. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 5829a22c7a5e0..4988784955697 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -82,52 +82,24 @@ All other defaults are as defined by [confighttp]. ### Queuing and batching -The exporter is transitioning from its own internal batching to OpenTelemetry's standard -queueing and batching. The below sections describe the current default and the latest -configuration option for queueing and batching available via the `sending_queue` configuration. - -#### Internal batching by Elasticsearch exporter - -By default, the exporter will perform its own buffering and batching, as configured through the -`flush` config. In this case both `sending_queue` and `batcher` will be unused. The exporter -will perform its own buffering and batching and will issue async requests to Elasticsearch in -all cases other than if any of the following conditions are met: - -- `sending_queue::batch` is defined (irrespective of `sending_queue` being enabled or not) -- `batcher::enabled` is defined (set to `true` or `false`) - -In a future release when the `sending_queue` config is stable, and has feature parity -with the exporter's existing `flush` config, it will be enabled by default. - -Using the `sending_queue` functionality provides several benefits over the default behavior: - - With a persistent queue, or no queue at all, `sending_queue` enables at least once delivery. - On the other hand, with the default behavior, the exporter will accept data and process it - asynchronously, which interacts poorly with queueing. - - By ensuring the exporter makes requests to Elasticsearch synchronously (batching disabled), - client metadata can be passed through to Elasticsearch requests, - e.g. by using the [`headers_setter` extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/extension/headerssetterextension/README.md). - -#### Queueing and batching using sending queue - The Elasticsearch exporter supports the common [`sending_queue` settings][exporterhelper] which -supports both queueing and batching. However, the sending queue is currently disabled by -default. Sending queue can be enabled by setting `sending_queue::enabled` to `true`. The batching support in sending queue is also disabled by default. Batching can be enabled by defining `sending_queue::batch`. +supports both queueing and batching. The default sending queue is configured to do async batching +with the following configuration: -The [`exporterhelper` documentation][exporterhelper] provides more details on the `sending_queue` settings. - -#### Deprecated batcher config - -> [!WARNING] -> The `batcher` config is now deprecated and will be removed in an upcoming version. Check the [queueing and batching](#queueing-and-batching) section for using the `sending_queue` setting that supersedes `batcher`. In the interim, `batcher` configurations are still valid, however, they will be ignored if `sending_queue::batch` is defined even if `sending_queue` is not enabled. - -The Elasticsearch exporter supports the [common `batcher` settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/internal/queue_sender.go). +```yaml +sending_queue: + enabled: true + sizer: requests + num_consumers: 10 + queue_size: 10 + batch: + flush_timeout: 10s + min_size: 1e+6 // 1MB + max_size: 5e+6 // 5MB + sizer: bytes +``` -- `batcher`: - - `enabled` (default=unset): Enable batching of requests into 1 or more bulk requests. On a batcher flush, it is possible for a batched request to be translated to more than 1 bulk request due to `flush::bytes`. - - `sizer` (default=items): Unit of `min_size` and `max_size`. Currently supports only "items", in the future will also support "bytes". - - `min_size` (default=5000): Minimum batch size to be exported to Elasticsearch, measured in units according to `batcher::sizer`. - - `max_size` (default=0): Maximum batch size to be exported to Elasticsearch, measured in units according to `batcher::sizer`. To limit bulk request size, configure `flush::bytes` instead. :warning: It is recommended to keep `max_size` as 0 as a non-zero value may lead to broken metrics grouping and indexing rejections. - - `flush_timeout` (default=10s): Maximum time of the oldest item spent inside the batcher buffer, aka "max age of batcher buffer". A batcher flush will happen regardless of the size of content in batcher buffer. +The default configurations are chosen to be closer to the defaults with the exporter's previous inbuilt batching feature. The [`exporterhelper` documentation][exporterhelper] provides more details on the `sending_queue` settings. ### Elasticsearch document routing @@ -320,10 +292,10 @@ This can be configured through the following settings: The Elasticsearch exporter uses the [Elasticsearch Bulk API] for indexing documents. The behaviour of this bulk indexing can be configured with the following settings: -- `num_workers` (default=runtime.NumCPU()): Number of workers publishing bulk requests concurrently. Note this is not applicable if `batcher::enabled` is `true` or `false`. -- `flush`: Event bulk indexer buffer flush settings - - `bytes` (default=5000000): Write buffer flush size limit before compression. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. - - `interval` (default=10s): Write buffer flush time limit. +- `num_workers` (DEPRECATED, use `sending_queue::num_consumers` instead): This config is deprecated and will be used to configure `sending_queue::num_consumers` if `sending_queue::num_consumers` is not explicitly defined. Number of workers publishing bulk requests concurrently. +- `flush` (DEPRECATED, use `sending_queue` instead): This config is deprecated and will be used to configure different options for `sending_queue` if `sending_queue` options are not explicitly defined. Event bulk indexer buffer flush settings + - `bytes` (DEPRECATED, use `sending_queue::batch::max_size` instead): This config is deprecated and will be used to configure `sending_queue::batch::max_size` if `sending_queue::batch::max_size` is not explicitly defined. See the `sending_queue::batch::max_size` for more details. + - `interval` (DEPRECATED, use `sending_queue::batch::flush_timeout` instead): This config is deprecated and will be used to configure `sending_queue::batch::flush_timeout` if `sending_queue::batch::flush_timeout` is not explicitly defined. See the `sending_queue::batch::flush_timeout` for more details. - `retry`: Elasticsearch bulk request retry settings - `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff. - `max_requests` (DEPRECATED, use retry::max_retries instead): Number of HTTP request retries including the initial attempt. If used, `retry::max_retries` will be set to `max_requests - 1`. @@ -331,9 +303,18 @@ The behaviour of this bulk indexing can be configured with the following setting - `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed. - `max_interval` (default=1m): Max waiting time if a HTTP request failed. - `retry_on_status` (default=[429]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it defaults to `[429]`. - -> [!NOTE] -> The `flush::interval` config will be ignored when `batcher::enabled` config is explicitly set to `true` or `false`. +- `sending_queue`: Configures the queueing and batching behaviour. Below are the defaults (which may vary from standard defaults), for full configuration check the [exporterheler docs][exporterhelper]. + - `enabled` (default=true): Enable queueing and batching behaviour. + - `num_consumers` (default=10): Number of consumers that dequeue batches. + - `wait_for_result` (default=false): If `true`, blocks incoming requests until processed. + - `block_on_overflow` (default=false): If `true`, blocks the request until the queue has space. + - `sizer` (default=requests): Measure queueing by requests. + - `queue_size` (default=10): Maximum size the queue can accept. + - `batch`: + - `flush_timeout` (default=10s): Time after which batch is exported irrespective of other settings. + - `sizer` (default=bytes): Size batches by bytes. Note that bytes here are based on the pdata model and not on the NDJSON docs that will constitute the bulk indexer requests. To address this discrepency the bulk indexers could also flush when their size exceeds the configured max_size due to size of pdata model being smaller than their corresponding NDJSON encoding. + - `min_size` (default=1MB): Min size of the batch. + - `max_size` (default=5MB): Max size of the batch. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. #### Bulk indexing error response diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index f3efd8fd743f3..39675576a8a45 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" semconv "go.opentelemetry.io/otel/semconv/v1.25.0" @@ -52,13 +53,6 @@ type bulkIndexerSession interface { End() // Flush flushes any documents added to the bulk indexing session. - // - // The behavior of Flush depends on whether the bulk indexer is - // synchronous or asynchronous. Calling Flush on an asynchronous bulk - // indexer session is effectively a no-op; flushing will be done in - // the background. Calling Flush on a synchronous bulk indexer session - // will wait for bulk indexing of added documents to complete, - // successfully or not. Flush(context.Context) error } @@ -70,11 +64,8 @@ func newBulkIndexer( requireDataStream bool, tb *metadata.TelemetryBuilder, logger *zap.Logger, -) (bulkIndexer, error) { - if config.Batcher.enabledSet || (config.QueueBatchConfig.Enabled && config.QueueBatchConfig.Batch.HasValue()) { - return newSyncBulkIndexer(client, config, requireDataStream, tb, logger), nil - } - return newAsyncBulkIndexer(client, config, requireDataStream, tb, logger) +) bulkIndexer { + return newSyncBulkIndexer(client, config, requireDataStream, tb, logger) } func bulkIndexerConfig(client esapi.Transport, config *Config, requireDataStream bool) docappender.BulkIndexerConfig { @@ -118,10 +109,17 @@ func newSyncBulkIndexer( tb *metadata.TelemetryBuilder, logger *zap.Logger, ) *syncBulkIndexer { + var maxFlushBytes int64 + if config.QueueBatchConfig.Batch.HasValue() { + batch := config.QueueBatchConfig.Batch.Get() + if batch.Sizer == exporterhelper.RequestSizerTypeBytes { + maxFlushBytes = batch.MaxSize + } + } return &syncBulkIndexer{ config: bulkIndexerConfig(client, config, requireDataStream), + maxFlushBytes: maxFlushBytes, flushTimeout: config.Timeout, - flushBytes: config.Flush.Bytes, retryConfig: config.Retry, metadataKeys: config.MetadataKeys, telemetryBuilder: tb, @@ -132,8 +130,8 @@ func newSyncBulkIndexer( type syncBulkIndexer struct { config docappender.BulkIndexerConfig + maxFlushBytes int64 flushTimeout time.Duration - flushBytes int retryConfig RetrySettings metadataKeys []string telemetryBuilder *metadata.TelemetryBuilder @@ -185,9 +183,10 @@ func (s *syncBulkIndexerSession) Add(ctx context.Context, index, docID, pipeline getAttributesFromMetadataKeys(ctx, s.s.metadataKeys)...), ), ) - // flush bytes should operate on uncompressed length - // as Elasticsearch http.max_content_length measures uncompressed length. - if s.bi.UncompressedLen() >= s.s.flushBytes { + // sending_queue operates on flush sizes based on pdata model whereas bulk + // indexers operate on ndjson. Force a flush if the ndjson size is too large. + // when the uncompressed length exceeds the configured max flush size. + if s.s.maxFlushBytes > 0 && int64(s.bi.UncompressedLen()) >= s.s.maxFlushBytes { return s.Flush(ctx) } return nil @@ -237,166 +236,6 @@ func (s *syncBulkIndexerSession) Flush(ctx context.Context) error { } } -func newAsyncBulkIndexer( - client esapi.Transport, - config *Config, - requireDataStream bool, - tb *metadata.TelemetryBuilder, - logger *zap.Logger, -) (*asyncBulkIndexer, error) { - numWorkers := config.NumWorkers - if numWorkers == 0 { - numWorkers = runtime.NumCPU() - } - - pool := &asyncBulkIndexer{ - wg: sync.WaitGroup{}, - items: make(chan docappender.BulkIndexerItem, config.NumWorkers), - telemetryBuilder: tb, - } - pool.wg.Add(numWorkers) - - for i := 0; i < numWorkers; i++ { - bi, err := docappender.NewBulkIndexer(bulkIndexerConfig(client, config, requireDataStream)) - if err != nil { - return nil, err - } - w := asyncBulkIndexerWorker{ - indexer: bi, - items: pool.items, - flushInterval: config.Flush.Interval, - flushTimeout: config.Timeout, - flushBytes: config.Flush.Bytes, - telemetryBuilder: tb, - logger: logger, - failedDocsInputLogger: newFailedDocsInputLogger(logger, config), - } - go func() { - defer pool.wg.Done() - w.run() - }() - } - return pool, nil -} - -type asyncBulkIndexer struct { - items chan docappender.BulkIndexerItem - wg sync.WaitGroup - telemetryBuilder *metadata.TelemetryBuilder -} - -type asyncBulkIndexerSession struct { - *asyncBulkIndexer -} - -// StartSession returns a new asyncBulkIndexerSession. -func (a *asyncBulkIndexer) StartSession(context.Context) bulkIndexerSession { - return asyncBulkIndexerSession{a} -} - -// Close closes the asyncBulkIndexer and any active sessions. -func (a *asyncBulkIndexer) Close(ctx context.Context) error { - close(a.items) - doneCh := make(chan struct{}) - go func() { - a.wg.Wait() - close(doneCh) - }() - select { - case <-ctx.Done(): - return ctx.Err() - case <-doneCh: - return nil - } -} - -// Add adds an item to the async bulk indexer session. -// -// Adding an item after a call to Close() will panic. -func (s asyncBulkIndexerSession) Add(ctx context.Context, index, docID, pipeline string, document io.WriterTo, dynamicTemplates map[string]string, action string) error { - item := docappender.BulkIndexerItem{ - Index: index, - Body: document, - DocumentID: docID, - DynamicTemplates: dynamicTemplates, - Action: action, - Pipeline: pipeline, - } - select { - case <-ctx.Done(): - return ctx.Err() - case s.items <- item: - } - s.telemetryBuilder.ElasticsearchDocsReceived.Add(ctx, 1) - return nil -} - -// End is a no-op. -func (asyncBulkIndexerSession) End() { -} - -// Flush is a no-op. -func (asyncBulkIndexerSession) Flush(context.Context) error { - return nil -} - -type asyncBulkIndexerWorker struct { - indexer *docappender.BulkIndexer - items <-chan docappender.BulkIndexerItem - flushInterval time.Duration - flushTimeout time.Duration - flushBytes int - - logger *zap.Logger - failedDocsInputLogger *zap.Logger - telemetryBuilder *metadata.TelemetryBuilder -} - -func (w *asyncBulkIndexerWorker) run() { - flushTick := time.NewTicker(w.flushInterval) - defer flushTick.Stop() - for { - select { - case item, ok := <-w.items: - // if channel is closed, flush and return - if !ok { - w.flush() - return - } - - if err := w.indexer.Add(item); err != nil { - w.logger.Error("error adding item to bulk indexer", zap.Error(err)) - } - - // flush bytes should operate on uncompressed length - // as Elasticsearch http.max_content_length measures uncompressed length. - if w.indexer.UncompressedLen() >= w.flushBytes { - w.flush() - flushTick.Reset(w.flushInterval) - } - case <-flushTick.C: - // bulk indexer needs to be flushed every flush interval because - // there may be pending bytes in bulk indexer buffer due to e.g. document level 429 - w.flush() - } - } -} - -func (w *asyncBulkIndexerWorker) flush() { - // TODO (lahsivjar): Should use proper context else client metadata will not be accessible - ctx := context.Background() - // ignore error as we they should be already logged and for async we don't propagate errors - _ = flushBulkIndexer( - ctx, - w.indexer, - w.flushTimeout, - nil, // async bulk indexer cannot propagate client context/metadata - w.telemetryBuilder, - w.logger, - w.failedDocsInputLogger, - ) -} - func flushBulkIndexer( ctx context.Context, bi *docappender.BulkIndexer, @@ -626,7 +465,7 @@ type bulkIndexers struct { // wg tracks active sessions wg sync.WaitGroup - // NOTE(axw) when we get rid of the async bulk indexer there would be + // NOTE(axw) we have removed async bulk indexer and there should be // no reason for having one per mode or for different document types. // Instead, the caller can create separate sessions as needed, and we // can either have one for required_data_stream=true and one for false, @@ -662,36 +501,20 @@ func (b *bulkIndexers) start( } for _, mode := range allowedMappingModes { - var bi bulkIndexer - bi, err = newBulkIndexer(esClient, cfg, mode == MappingOTel, b.telemetryBuilder, set.Logger) - if err != nil { - return err - } + bi := newBulkIndexer(esClient, cfg, mode == MappingOTel, b.telemetryBuilder, set.Logger) b.modes[mode] = &wgTrackingBulkIndexer{bulkIndexer: bi, wg: &b.wg} } - profilingEvents, err := newBulkIndexer(esClient, cfg, true, b.telemetryBuilder, set.Logger) - if err != nil { - return err - } + profilingEvents := newBulkIndexer(esClient, cfg, true, b.telemetryBuilder, set.Logger) b.profilingEvents = &wgTrackingBulkIndexer{bulkIndexer: profilingEvents, wg: &b.wg} - profilingStackTraces, err := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) - if err != nil { - return err - } + profilingStackTraces := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) b.profilingStackTraces = &wgTrackingBulkIndexer{bulkIndexer: profilingStackTraces, wg: &b.wg} - profilingStackFrames, err := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) - if err != nil { - return err - } + profilingStackFrames := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) b.profilingStackFrames = &wgTrackingBulkIndexer{bulkIndexer: profilingStackFrames, wg: &b.wg} - profilingExecutables, err := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) - if err != nil { - return err - } + profilingExecutables := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) b.profilingExecutables = &wgTrackingBulkIndexer{bulkIndexer: profilingExecutables, wg: &b.wg} return nil } diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index 5e7fee5af2868..bba013bdeb6fa 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -4,13 +4,11 @@ package elasticsearchexporter import ( - "errors" "io" "net/http" "strings" "sync/atomic" "testing" - "time" "github.com/elastic/go-docappender/v2" "github.com/elastic/go-elasticsearch/v8" @@ -18,9 +16,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configcompression" - "go.opentelemetry.io/collector/config/confighttp" - "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" @@ -63,562 +59,7 @@ const successResp = `{ ] }` -func TestAsyncBulkIndexer_flushOnClose(t *testing.T) { - cfg := Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 2 << 30}} - client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ - RoundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader(successResp)), - }, nil - }, - }}) - require.NoError(t, err) - - runBulkIndexerOnce(t, &cfg, client) -} - -func TestAsyncBulkIndexer_flush(t *testing.T) { - tests := []struct { - name string - config Config - }{ - { - name: "flush.bytes", - config: Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 1}}, - }, - { - name: "flush.interval", - config: Config{NumWorkers: 1, Flush: FlushSettings{Interval: 50 * time.Millisecond, Bytes: 2 << 30}}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ - RoundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader(successResp)), - }, nil - }, - }}) - require.NoError(t, err) - - ct := componenttest.NewTelemetry() - tb, err := metadata.NewTelemetryBuilder( - metadatatest.NewSettings(ct).TelemetrySettings, - ) - require.NoError(t, err) - bulkIndexer, err := newAsyncBulkIndexer(client, &tt.config, false, tb, zap.NewNop()) - require.NoError(t, err) - - session := bulkIndexer.StartSession(t.Context()) - assert.NoError(t, session.Add(t.Context(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) - // should flush - time.Sleep(100 * time.Millisecond) - assert.NoError(t, session.Flush(t.Context())) - session.End() - assert.NoError(t, bulkIndexer.Close(t.Context())) - // Assert internal telemetry metrics - metadatatest.AssertEqualElasticsearchBulkRequestsCount(t, ct, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchDocsReceived(t, ct, []metricdata.DataPoint[int64]{ - {Value: 1}, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchDocsProcessed(t, ct, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchFlushedUncompressedBytes(t, ct, []metricdata.DataPoint[int64]{ - {Value: 43}, // hard-coding the flush bytes since the input is fixed - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchFlushedBytes(t, ct, []metricdata.DataPoint[int64]{ - {Value: 43}, // hard-coding the flush bytes since the input is fixed - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchBulkRequestsCount(t, ct, []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) - }) - } -} - -func TestAsyncBulkIndexer_flush_error(t *testing.T) { - tests := []struct { - name string - roundTripFunc func(*http.Request) (*http.Response, error) - logFailedDocsInput bool - retrySettings RetrySettings - wantMessage string - wantFields []zap.Field - wantESBulkReqs *metricdata.DataPoint[int64] - wantESDocsProcessed *metricdata.DataPoint[int64] - wantESDocsRetried *metricdata.DataPoint[int64] - wantESLatency *metricdata.HistogramDataPoint[float64] - }{ - { - name: "500", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusInternalServerError, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader("error")), - }, nil - }, - wantMessage: "bulk indexer flush error", - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_server"), - semconv.HTTPResponseStatusCode(500), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_server"), - semconv.HTTPResponseStatusCode(500), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_server"), - semconv.HTTPResponseStatusCode(500), - ), - }, - }, - { - name: "429", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusTooManyRequests, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader("error")), - }, nil - }, - wantMessage: "bulk indexer flush error", - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "too_many"), - semconv.HTTPResponseStatusCode(429), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "too_many"), - semconv.HTTPResponseStatusCode(429), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "too_many"), - semconv.HTTPResponseStatusCode(429), - ), - }, - }, - { - name: "429/with_retry", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader( - `{"items":[{"create":{"_index":"test","status":429}}]}`)), - }, nil - }, - retrySettings: RetrySettings{Enabled: true, MaxRetries: 5, RetryOnStatus: []int{429}}, - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - wantESDocsRetried: &metricdata.DataPoint[int64]{Value: 1}, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, - { - name: "500/doc_level", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader( - `{"items":[{"create":{"_index":"test","status":500,"error":{"type":"internal_server_error","reason":""}}}]}`)), - }, nil - }, - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_server"), - attribute.String("error.type", "internal_server_error"), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, - { - name: "transport error", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return nil, errors.New("transport error") - }, - wantMessage: "bulk indexer flush error", - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "internal_server_error"), - semconv.HTTPResponseStatusCode(http.StatusInternalServerError), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "internal_server_error"), - semconv.HTTPResponseStatusCode(http.StatusInternalServerError), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "internal_server_error"), - semconv.HTTPResponseStatusCode(http.StatusInternalServerError), - ), - }, - }, - { - name: "known version conflict error", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader( - `{"items":[{"create":{"_index":".ds-metrics-generic.otel-default","status":400,"error":{"type":"version_conflict_engine_exception","reason":""}}}]}`)), - }, nil - }, - wantMessage: "failed to index document", - wantFields: []zap.Field{zap.String("hint", "check the \"Known issues\" section of Elasticsearch Exporter docs")}, - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_client"), - attribute.String("error.type", "version_conflict_engine_exception"), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, - { - name: "known version conflict error with logFailedDocsInput", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader( - `{"items":[{"create":{"_index":".ds-metrics-generic.otel-default","status":400,"error":{"type":"version_conflict_engine_exception","reason":""}}}]}`)), - }, nil - }, - logFailedDocsInput: true, - wantMessage: "failed to index document; input may contain sensitive data", - wantFields: []zap.Field{ - zap.String("hint", "check the \"Known issues\" section of Elasticsearch Exporter docs"), - zap.String("input", `{"create":{"_index":"foo"}} -{"foo": "bar"} -`), - }, - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_client"), - attribute.String("error.type", "version_conflict_engine_exception"), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, - { - name: "skip profiling version conflict logging", - roundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader( - `{"items":[{"create":{"_index":".profiling-stackframes-2024.06.01","status":400,"error":{"type":"version_conflict_engine_exception","reason":"document already exists"}}}]}`)), - }, nil - }, - wantMessage: "", - wantESBulkReqs: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(200), - ), - }, - wantESDocsProcessed: &metricdata.DataPoint[int64]{ - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "failed_client"), - attribute.String("error.type", "version_conflict_engine_exception"), - ), - }, - wantESLatency: &metricdata.HistogramDataPoint[float64]{ - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(200), - ), - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - cfg := Config{ - NumWorkers: 1, - Flush: FlushSettings{Interval: time.Hour, Bytes: 1}, - Retry: tt.retrySettings, - MetadataKeys: []string{"x-test"}, - } - if tt.logFailedDocsInput { - cfg.LogFailedDocsInput = true - } - esClient, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ - RoundTripFunc: tt.roundTripFunc, - }}) - require.NoError(t, err) - core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel)) - - ct := componenttest.NewTelemetry() - tb, err := metadata.NewTelemetryBuilder( - metadatatest.NewSettings(ct).TelemetrySettings, - ) - require.NoError(t, err) - bulkIndexer, err := newAsyncBulkIndexer(esClient, &cfg, false, tb, zap.New(core)) - require.NoError(t, err) - defer bulkIndexer.Close(t.Context()) - - // Client metadata are not added to the telemetry for async bulk indexer - info := client.Info{Metadata: client.NewMetadata(map[string][]string{"x-test": {"test"}})} - ctx := client.NewContext(t.Context(), info) - session := bulkIndexer.StartSession(ctx) - assert.NoError(t, session.Add(ctx, "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) - // should flush - time.Sleep(100 * time.Millisecond) - if tt.wantMessage != "" { - messages := observed.FilterMessage(tt.wantMessage) - require.Equal(t, 1, messages.Len(), "message not found; observed.All()=%v", observed.All()) - for _, wantField := range tt.wantFields { - assert.Equal(t, 1, messages.FilterField(wantField).Len(), "message with field not found; observed.All()=%v", observed.All()) - } - } - assert.NoError(t, session.Flush(t.Context())) - session.End() - // Assert internal telemetry metrics - if tt.wantESBulkReqs != nil { - metadatatest.AssertEqualElasticsearchBulkRequestsCount( - t, ct, - []metricdata.DataPoint[int64]{*tt.wantESBulkReqs}, - metricdatatest.IgnoreTimestamp(), - ) - } - if tt.wantESDocsProcessed != nil { - metadatatest.AssertEqualElasticsearchDocsProcessed( - t, ct, - []metricdata.DataPoint[int64]{*tt.wantESDocsProcessed}, - metricdatatest.IgnoreTimestamp(), - ) - } - if tt.wantESDocsRetried != nil { - metadatatest.AssertEqualElasticsearchDocsRetried( - t, ct, - []metricdata.DataPoint[int64]{*tt.wantESDocsRetried}, - metricdatatest.IgnoreTimestamp(), - ) - } - if tt.wantESLatency != nil { - metadatatest.AssertEqualElasticsearchBulkRequestsLatency( - t, ct, - []metricdata.HistogramDataPoint[float64]{*tt.wantESLatency}, - metricdatatest.IgnoreTimestamp(), - metricdatatest.IgnoreValue(), - ) - } - }) - } -} - -func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) { - tests := []struct { - name string - config Config - }{ - { - name: "compression none", - config: Config{ - NumWorkers: 1, - ClientConfig: confighttp.ClientConfig{Compression: "none"}, - Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8}, - }, - }, - { - name: "compression gzip", - config: Config{ - NumWorkers: 1, - ClientConfig: confighttp.ClientConfig{Compression: "gzip"}, - Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8}, - }, - }, - { - name: "compression gzip - level 5", - config: Config{ - NumWorkers: 1, - ClientConfig: confighttp.ClientConfig{Compression: "gzip", CompressionParams: configcompression.CompressionParams{Level: 5}}, - Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - loggerCore, logObserver := observer.New(zap.DebugLevel) - - esLogger := clientLogger{ - Logger: zap.New(loggerCore), - logRequestBody: true, - logResponseBody: true, - } - - client, err := elasticsearch.NewClient(elasticsearch.Config{ - Transport: &mockTransport{ - RoundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader(successResp)), - }, nil - }, - }, - Logger: &esLogger, - }) - require.NoError(t, err) - - runBulkIndexerOnce(t, &tt.config, client) - - records := logObserver.AllUntimed() - require.Len(t, records, 1) - - assert.Equal(t, "/_bulk", records[0].ContextMap()["path"]) - assert.Equal(t, "{\"create\":{\"_index\":\"foo\"}}\n{\"foo\": \"bar\"}\n", records[0].ContextMap()["request_body"]) - assert.JSONEq(t, successResp, records[0].ContextMap()["response_body"].(string)) - }) - } -} - -func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Client) *asyncBulkIndexer { - ct := componenttest.NewTelemetry() - tb, err := metadata.NewTelemetryBuilder( - metadatatest.NewSettings(ct).TelemetrySettings, - ) - require.NoError(t, err) - bulkIndexer, err := newAsyncBulkIndexer(client, config, false, tb, zap.NewNop()) - require.NoError(t, err) - - session := bulkIndexer.StartSession(t.Context()) - assert.NoError(t, session.Add(t.Context(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) - assert.NoError(t, session.Flush(t.Context())) - session.End() - assert.NoError(t, bulkIndexer.Close(t.Context())) - // Assert internal telemetry metrics - metadatatest.AssertEqualElasticsearchBulkRequestsCount(t, ct, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - semconv.HTTPResponseStatusCode(http.StatusOK), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchDocsReceived(t, ct, []metricdata.DataPoint[int64]{ - {Value: 1}, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchDocsProcessed(t, ct, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("outcome", "success"), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchFlushedUncompressedBytes(t, ct, []metricdata.DataPoint[int64]{ - {Value: 43}, // hard-coding the flush bytes since the input is fixed - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualElasticsearchFlushedBytes( - t, ct, - []metricdata.DataPoint[int64]{{}}, - metricdatatest.IgnoreTimestamp(), - metricdatatest.IgnoreValue(), // compression can change in test, ignore value - ) - - return bulkIndexer -} - -func TestSyncBulkIndexer_flushBytes(t *testing.T) { +func TestSyncBulkIndexer(t *testing.T) { tests := []struct { name string responseBody string @@ -641,8 +82,9 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var reqCnt atomic.Int64 cfg := Config{ - NumWorkers: 1, - Flush: FlushSettings{Interval: time.Hour, Bytes: 1}, + QueueBatchConfig: exporterhelper.QueueBatchConfig{ + NumConsumers: 1, + }, MetadataKeys: []string{"x-test"}, } esClient, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ @@ -672,8 +114,9 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { ctx := client.NewContext(t.Context(), info) session := bi.StartSession(ctx) assert.NoError(t, session.Add(ctx, "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) - assert.Equal(t, int64(1), reqCnt.Load()) + assert.Equal(t, int64(0), reqCnt.Load()) // requests will not flush unless flush is called explicitly assert.NoError(t, session.Flush(ctx)) + assert.Equal(t, int64(1), reqCnt.Load()) session.End() assert.NoError(t, bi.Close(ctx)) @@ -744,129 +187,10 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { } func TestNewBulkIndexer(t *testing.T) { - for _, tc := range []struct { - name string - config map[string]any - expectSyncBulkIndexer bool - }{ - { - name: "batcher_enabled_unset", - config: map[string]any{ - "batcher": map[string]any{ - "min_size": 100, - "max_size": 200, - }, - }, - expectSyncBulkIndexer: false, - }, - { - name: "batcher_enabled=true", - config: map[string]any{ - "batcher": map[string]any{ - "enabled": true, - "min_size": 100, - "max_size": 200, - }, - }, - expectSyncBulkIndexer: true, - }, - { - name: "batcher_enabled=true", - config: map[string]any{ - "batcher": map[string]any{ - "enabled": false, - "min_size": 100, - "max_size": 200, - }, - }, - expectSyncBulkIndexer: true, - }, - { - name: "sending_queue_enabled_without_batcher", - config: map[string]any{ - "sending_queue": map[string]any{ - "enabled": true, - }, - }, - expectSyncBulkIndexer: false, - }, - { - name: "sending_queue__with_batch_enabled", - config: map[string]any{ - "sending_queue": map[string]any{ - "enabled": true, - "batch": map[string]any{ - "min_size": 100, - "max_size": 200, - }, - }, - }, - expectSyncBulkIndexer: true, - }, - { - name: "sending_queue_disabled_but_batch_configured", - config: map[string]any{ - "sending_queue": map[string]any{ - "enabled": false, - "batch": map[string]any{ - "min_size": 100, - "max_size": 200, - }, - }, - "batcher": map[string]any{ - // no enabled set - "min_size": 100, - "max_size": 200, - }, - }, - expectSyncBulkIndexer: false, - }, - { - name: "sending_queue_overrides_batcher", - config: map[string]any{ - "sending_queue": map[string]any{ - "enabled": true, - "batch": map[string]any{ - "min_size": 100, - "max_size": 200, - }, - }, - "batcher": map[string]any{ - "enabled": true, - "min_size": 100, - "max_size": 200, - }, - }, - expectSyncBulkIndexer: true, - }, - { - name: "sending_queue_without_batch_with_batcher_enabled", - config: map[string]any{ - "sending_queue": map[string]any{ - "enabled": true, - }, - "batcher": map[string]any{ - "enabled": true, - "min_size": 100, - "max_size": 200, - }, - }, - expectSyncBulkIndexer: true, - }, - } { - t.Run(tc.name, func(t *testing.T) { - client, err := elasticsearch.NewDefaultClient() - require.NoError(t, err) - cfg := createDefaultConfig() - cm := confmap.NewFromStringMap(tc.config) - require.NoError(t, cm.Unmarshal(cfg)) - - bi, err := newBulkIndexer(client, cfg.(*Config), true, nil, nil) - require.NoError(t, err) - t.Cleanup(func() { bi.Close(t.Context()) }) + client, err := elasticsearch.NewDefaultClient() + require.NoError(t, err) + cfg := createDefaultConfig() - _, ok := bi.(*syncBulkIndexer) - assert.Equal(t, tc.expectSyncBulkIndexer, ok) - }) - } + bi := newBulkIndexer(client, cfg.(*Config), true, nil, nil) + t.Cleanup(func() { bi.Close(t.Context()) }) } diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index b1ba346b0097e..677b30168c05e 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -41,6 +41,10 @@ type Config struct { CloudID string `mapstructure:"cloudid"` // NumWorkers configures the number of workers publishing bulk requests. + // + // Deprecated: [v0.136.0] This config is now deprecated. Use `sending_queue::num_consumers` + // instead. If this config is defined and `sending_queue::num_consumers` is not defined then + // it will be used to set `sending_queue::num_consumers`. NumWorkers int `mapstructure:"num_workers"` // LogsIndex configures the static index used for document routing for logs. @@ -74,9 +78,13 @@ type Config struct { Authentication AuthenticationSettings `mapstructure:",squash"` Discovery DiscoverySettings `mapstructure:"discover"` Retry RetrySettings `mapstructure:"retry"` - Flush FlushSettings `mapstructure:"flush"` - Mapping MappingsSettings `mapstructure:"mapping"` - LogstashFormat LogstashFormatSettings `mapstructure:"logstash_format"` + + // Deprecated: [v0.136.0] This config is now deprecated. Use `sending_queue::batch` instead. + // If this config is defined then it will be used to configure sending queue's batch provided + // sending queue's config are not explicitly defined. + Flush FlushSettings `mapstructure:"flush"` + Mapping MappingsSettings `mapstructure:"mapping"` + LogstashFormat LogstashFormatSettings `mapstructure:"logstash_format"` // TelemetrySettings contains settings useful for testing/debugging purposes. // This is experimental and may change at any time. @@ -97,18 +105,6 @@ type Config struct { // Users are expected to sanitize the responses themselves. IncludeSourceOnError *bool `mapstructure:"include_source_on_error"` - // Batcher holds configuration for batching requests based on timeout - // and size-based thresholds. - // - // Batcher is unused by default, in which case Flush will be used. - // If Batcher.Enabled is non-nil (i.e. batcher::enabled is specified), - // then the Flush will be ignored even if Batcher.Enabled is false. - // - // Deprecated: [v0.132.0] This config is now deprecated. Use `sending_queue::batch` instead. - // Batcher config will be ignored if `sending_queue::batch` is defined even if sending queue - // is disabled. - Batcher BatcherConfig `mapstructure:"batcher"` - // Experimental: MetadataKeys defines a list of client.Metadata keys that // will be used as partition keys for when batcher is enabled and will be // added to the exporter's telemetry if defined. The config only applies @@ -122,31 +118,6 @@ type Config struct { MetadataKeys []string `mapstructure:"metadata_keys"` } -// BatcherConfig holds configuration for exporterbatcher. -// -// This is a slightly modified version of exporterbatcher.Config, -// to enable tri-state Enabled: unset, false, true. -type BatcherConfig struct { - Enabled bool `mapstructure:"enabled"` - FlushTimeout time.Duration `mapstructure:"flush_timeout"` - Sizer exporterhelper.RequestSizerType `mapstructure:"sizer"` - MinSize int64 `mapstructure:"min_size"` - MaxSize int64 `mapstructure:"max_size"` - - // enabledSet tracks whether Enabled has been specified. - // If enabledSet is false, the exporter will perform its - // own buffering. - enabledSet bool `mapstructure:"-"` -} - -func (c *BatcherConfig) Unmarshal(conf *confmap.Conf) error { - if err := conf.Unmarshal(c); err != nil { - return err - } - c.enabledSet = conf.IsSet("enabled") - return nil -} - type TelemetrySettings struct { LogRequestBody bool `mapstructure:"log_request_body"` LogResponseBody bool `mapstructure:"log_response_body"` @@ -233,11 +204,24 @@ type DiscoverySettings struct { // FlushSettings defines settings for configuring the write buffer flushing // policy in the Elasticsearch exporter. The exporter sends a bulk request with // all events already serialized into the send-buffer. +// +// Deprecated: [v0.136.0] This config is now deprecated. Use `sending_queue::batch` instead. +// If this config is defined then it will be used to configure sending queue's batch provided +// sending queue's config are not explicitly defined. type FlushSettings struct { // Bytes sets the send buffer flushing limit. + // Bytes is now deprecated. Use `sending_queue::batch::{min, max}_size` with `bytes` + // sizer to configure batching based on bytes. + // + // If this config option is defined then it will be used to configure `sending_queue::batch::max_size` + // provided it is not explcitly defined. Bytes int `mapstructure:"bytes"` // Interval configures the max age of a document in the send buffer. + // Interval is now deprecated. Use `sending-queue::batch::flush_timeout` instead. + // + // If this config option is defined then it will be used to configure `sending_queue::batch::flush_timeout` + // provided it is not explcitly defined. Interval time.Duration `mapstructure:"interval"` // prevent unkeyed literal initialization @@ -325,6 +309,25 @@ var ( const defaultElasticsearchEnvName = "ELASTICSEARCH_URL" +func (cfg *Config) Unmarshal(conf *confmap.Conf) error { + if err := conf.Unmarshal(cfg); err != nil { + return err + } + if !conf.IsSet("sending_queue::num_consumers") && conf.IsSet("num_workers") { + cfg.QueueBatchConfig.NumConsumers = cfg.NumWorkers + } + if cfg.QueueBatchConfig.Batch.HasValue() { + qbCfg := cfg.QueueBatchConfig.Batch.Get() + if !conf.IsSet("sending_queue::batch::flush_timeout") && conf.IsSet("flush::interval") { + qbCfg.FlushTimeout = cfg.Flush.Interval + } + if !conf.IsSet("sending_queue::batch::max_size") && conf.IsSet("flush::bytes") { + qbCfg.MaxSize = int64(cfg.Flush.Bytes) + } + } + return nil +} + // Validate validates the elasticsearch server configuration. func (cfg *Config) Validate() error { endpoints, err := cfg.endpoints() @@ -502,11 +505,11 @@ func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) { if cfg.TracesDynamicIndex.Enabled { logger.Warn("traces_dynamic_index::enabled has been deprecated, and will be removed in a future version. It is now a no-op. Dynamic document routing is now the default. See Elasticsearch Exporter README.") } - switch { - case cfg.Batcher.enabledSet && cfg.QueueBatchConfig.Batch.HasValue(): - logger.Warn("batcher::enabled and sending_queue::batch both have been set, sending_queue::batch will take preference.") - case cfg.Batcher.enabledSet: - logger.Warn("batcher has been deprecated, and will be removed in a future version. Use sending_queue instead.") + if cfg.Flush.Bytes > 0 || cfg.Flush.Interval > 0 { + logger.Warn("flush settings are now deprecated and ignored. Use `sending_queue` instead.") + } + if cfg.NumWorkers > 0 { + logger.Warn("num_workers is now deprecated and ignored. Use `sending_queue` instead.") } } diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 755227ff7f8f7..ed0b76c17f7ad 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -54,16 +54,25 @@ func TestConfig(t *testing.T) { expected: defaultCfg, }, { - id: component.NewIDWithName(metadata.Type, "trace"), configFile: "config.yaml", + id: component.NewIDWithName(metadata.Type, "trace"), expected: &Config{ QueueBatchConfig: exporterhelper.QueueBatchConfig{ - Enabled: false, - NumConsumers: exporterhelper.NewDefaultQueueConfig().NumConsumers, - QueueSize: exporterhelper.NewDefaultQueueConfig().QueueSize, - Sizer: exporterhelper.RequestSizerTypeRequests, + Enabled: true, + NumConsumers: 10, + QueueSize: 10, + BlockOnOverflow: true, + Sizer: exporterhelper.RequestSizerTypeRequests, + Batch: configoptional.Some(exporterhelper.BatchConfig{ + FlushTimeout: 10 * time.Second, + Sizer: exporterhelper.RequestSizerTypeBytes, + MinSize: 1000000, + MaxSize: 5000000, + }), + }, + Endpoints: []string{ + "https://elastic.example.com:9200", }, - Endpoints: []string{"https://elastic.example.com:9200"}, LogsDynamicIndex: DynamicIndexSetting{ Enabled: false, }, @@ -85,12 +94,11 @@ func TestConfig(t *testing.T) { cfg.Timeout = 2 * time.Minute cfg.MaxIdleConns = defaultMaxIdleConns cfg.IdleConnTimeout = defaultIdleConnTimeout - cfg.Headers = map[string]configopaque.String{ - "myheader": "test", - } + cfg.Headers = map[string]configopaque.String{"myheader": "test"} cfg.Compression = defaultCompression cfg.CompressionParams.Level = gzip.BestSpeed - }), + }, + ), Authentication: AuthenticationSettings{ User: "elastic", Password: "search", @@ -99,31 +107,31 @@ func TestConfig(t *testing.T) { Discovery: DiscoverySettings{ OnStart: true, }, - Flush: FlushSettings{ - Bytes: 10485760, - Interval: 10 * time.Second, - }, Retry: RetrySettings{ Enabled: true, MaxRetries: 5, InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, - RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError}, + RetryOnStatus: []int{ + http.StatusTooManyRequests, + http.StatusInternalServerError, + }, }, Mapping: MappingsSettings{ - Mode: "otel", - AllowedModes: []string{"bodymap", "ecs", "none", "otel", "raw"}, + Mode: "otel", + AllowedModes: []string{ + "bodymap", + "ecs", + "none", + "otel", + "raw", + }, }, LogstashFormat: LogstashFormatSettings{ Enabled: false, PrefixSeparator: "-", DateFormat: "%Y.%m.%d", }, - Batcher: BatcherConfig{ - FlushTimeout: 10 * time.Second, - Sizer: exporterhelper.RequestSizerTypeItems, - MinSize: defaultBatcherMinSizeItems, - }, TelemetrySettings: TelemetrySettings{ LogFailedDocsInputRateLimit: time.Second, }, @@ -134,10 +142,17 @@ func TestConfig(t *testing.T) { configFile: "config.yaml", expected: &Config{ QueueBatchConfig: exporterhelper.QueueBatchConfig{ - Enabled: true, - NumConsumers: exporterhelper.NewDefaultQueueConfig().NumConsumers, - QueueSize: exporterhelper.NewDefaultQueueConfig().QueueSize, - Sizer: exporterhelper.RequestSizerTypeRequests, + Enabled: true, + NumConsumers: 10, + QueueSize: 10, + BlockOnOverflow: true, + Sizer: exporterhelper.RequestSizerTypeRequests, + Batch: configoptional.Some(exporterhelper.BatchConfig{ + FlushTimeout: 10 * time.Second, + Sizer: exporterhelper.RequestSizerTypeBytes, + MinSize: 1000000, + MaxSize: 5000000, + }), }, Endpoints: []string{"http://localhost:9200"}, LogsIndex: "my_log_index", @@ -175,10 +190,6 @@ func TestConfig(t *testing.T) { Discovery: DiscoverySettings{ OnStart: true, }, - Flush: FlushSettings{ - Bytes: 10485760, - Interval: 10 * time.Second, - }, Retry: RetrySettings{ Enabled: true, MaxRetries: 5, @@ -195,11 +206,6 @@ func TestConfig(t *testing.T) { PrefixSeparator: "-", DateFormat: "%Y.%m.%d", }, - Batcher: BatcherConfig{ - FlushTimeout: 10 * time.Second, - Sizer: exporterhelper.RequestSizerTypeItems, - MinSize: defaultBatcherMinSizeItems, - }, TelemetrySettings: TelemetrySettings{ LogFailedDocsInputRateLimit: time.Second, }, @@ -210,10 +216,17 @@ func TestConfig(t *testing.T) { configFile: "config.yaml", expected: &Config{ QueueBatchConfig: exporterhelper.QueueBatchConfig{ - Enabled: true, - NumConsumers: exporterhelper.NewDefaultQueueConfig().NumConsumers, - QueueSize: exporterhelper.NewDefaultQueueConfig().QueueSize, - Sizer: exporterhelper.RequestSizerTypeRequests, + Enabled: true, + NumConsumers: 10, + QueueSize: 10, + BlockOnOverflow: true, + Sizer: exporterhelper.RequestSizerTypeRequests, + Batch: configoptional.Some(exporterhelper.BatchConfig{ + FlushTimeout: 10 * time.Second, + Sizer: exporterhelper.RequestSizerTypeBytes, + MinSize: 1000000, + MaxSize: 5000000, + }), }, Endpoints: []string{"http://localhost:9200"}, LogsDynamicIndex: DynamicIndexSetting{ @@ -251,10 +264,6 @@ func TestConfig(t *testing.T) { Discovery: DiscoverySettings{ OnStart: true, }, - Flush: FlushSettings{ - Bytes: 10485760, - Interval: 10 * time.Second, - }, Retry: RetrySettings{ Enabled: true, MaxRetries: 5, @@ -271,11 +280,6 @@ func TestConfig(t *testing.T) { PrefixSeparator: "-", DateFormat: "%Y.%m.%d", }, - Batcher: BatcherConfig{ - FlushTimeout: 10 * time.Second, - Sizer: exporterhelper.RequestSizerTypeItems, - MinSize: defaultBatcherMinSizeItems, - }, TelemetrySettings: TelemetrySettings{ LogFailedDocsInputRateLimit: time.Second, }, @@ -305,16 +309,6 @@ func TestConfig(t *testing.T) { cfg.Endpoint = "https://elastic.example.com:9200" }), }, - { - id: component.NewIDWithName(metadata.Type, "batcher_disabled"), - configFile: "config.yaml", - expected: withDefaultConfig(func(cfg *Config) { - cfg.Endpoint = "https://elastic.example.com:9200" - - cfg.Batcher.Enabled = false - cfg.Batcher.enabledSet = true - }), - }, { id: component.NewIDWithName(metadata.Type, "compression_none"), configFile: "config.yaml", @@ -334,50 +328,87 @@ func TestConfig(t *testing.T) { }), }, { - id: component.NewIDWithName(metadata.Type, "batcher_minmax_size"), + id: component.NewIDWithName(metadata.Type, "include_source_on_error"), configFile: "config.yaml", expected: withDefaultConfig(func(cfg *Config) { cfg.Endpoint = "https://elastic.example.com:9200" - - cfg.Batcher.MinSize = 100 - cfg.Batcher.MaxSize = 200 + includeSource := true + cfg.IncludeSourceOnError = &includeSource }), }, { - id: component.NewIDWithName(metadata.Type, "include_source_on_error"), + id: component.NewIDWithName(metadata.Type, "metadata_keys"), configFile: "config.yaml", expected: withDefaultConfig(func(cfg *Config) { cfg.Endpoint = "https://elastic.example.com:9200" - includeSource := true - cfg.IncludeSourceOnError = &includeSource + + cfg.MetadataKeys = []string{"x-test-1", "x-test-2"} }), }, { - id: component.NewIDWithName(metadata.Type, "metadata_keys"), + id: component.NewIDWithName(metadata.Type, "sendingqueue_disabled"), configFile: "config.yaml", expected: withDefaultConfig(func(cfg *Config) { cfg.Endpoint = "https://elastic.example.com:9200" - cfg.MetadataKeys = []string{"x-test-1", "x-test-2"} + cfg.QueueBatchConfig.Enabled = false }), }, { - id: component.NewIDWithName(metadata.Type, "queuebatch_enabled"), + id: component.NewIDWithName(metadata.Type, "sendingqueue_enabled"), configFile: "config.yaml", expected: withDefaultConfig(func(cfg *Config) { cfg.Endpoint = "https://elastic.example.com:9200" - cfg.QueueBatchConfig.Enabled = true cfg.QueueBatchConfig.NumConsumers = 100 - cfg.QueueBatchConfig.Sizer = exporterhelper.RequestSizerTypeItems cfg.QueueBatchConfig.Batch = configoptional.Some( exporterhelper.BatchConfig{ Sizer: exporterhelper.RequestSizerTypeItems, - FlushTimeout: 10 * time.Second, + FlushTimeout: time.Second, + MinSize: 1000, + MaxSize: 5000, }, ) }), }, + { + id: component.NewIDWithName(metadata.Type, "backward_compat_for_deprecated_cfgs/new_config_takes_priority"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://elastic.example.com:9200" + + cfg.NumWorkers = 11 + cfg.Flush = FlushSettings{ + Bytes: 1001, + Interval: 11 * time.Second, + } + cfg.QueueBatchConfig.NumConsumers = 111 + // QueueBatchConfig is set by default + qbCfg := cfg.QueueBatchConfig.Batch.Get() + qbCfg.FlushTimeout = 111 * time.Second + qbCfg.MaxSize = 1_000_001 + qbCfg.Sizer = exporterhelper.RequestSizerTypeBytes + }), + }, + { + id: component.NewIDWithName(metadata.Type, "backward_compat_for_deprecated_cfgs/fallback_to_old_cfg"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://elastic.example.com:9200" + + cfg.NumWorkers = 11 + cfg.Flush = FlushSettings{ + Bytes: 1_000_001, + Interval: 11 * time.Second, + } + cfg.QueueBatchConfig.NumConsumers = 11 + // QueueBatchConfig is set by default + qbCfg := cfg.QueueBatchConfig.Batch.Get() + qbCfg.FlushTimeout = 11 * time.Second + qbCfg.MaxSize = 1_000_001 + qbCfg.Sizer = exporterhelper.RequestSizerTypeBytes + }), + }, } for _, tt := range tests { diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index f1e4fc5cebccc..8c4b580398511 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -631,9 +631,9 @@ func TestExporterLogs(t *testing.T) { cfg.Retry.InitialInterval = 1 * time.Millisecond cfg.Retry.MaxInterval = 5 * time.Millisecond - // use sync bulk indexer - cfg.Batcher.Enabled = false - cfg.Batcher.enabledSet = true + // use sync flushing + cfg.QueueBatchConfig.Enabled = true + cfg.QueueBatchConfig.WaitForResult = true }) logs := plog.NewLogs() @@ -675,8 +675,10 @@ func TestExporterLogs(t *testing.T) { cfg.Retry.InitialInterval = 1 * time.Millisecond cfg.Retry.MaxInterval = 5 * time.Millisecond - // use async bulk indexer - cfg.Batcher.enabledSet = false + // use async indexer + cfg.QueueBatchConfig.Enabled = true + cfg.QueueBatchConfig.WaitForResult = false + cfg.QueueBatchConfig.BlockOnOverflow = false }) mustSendLogRecords(t, exporter, plog.NewLogRecord()) // as sync bulk indexer is used, retries are not guaranteed to finish @@ -822,7 +824,6 @@ func TestExporterLogs(t *testing.T) { exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { cfg.Mapping.Mode = "otel" - cfg.Flush.Interval = 50 * time.Millisecond cfg.Retry.InitialInterval = 1 * time.Millisecond cfg.Retry.MaxInterval = 10 * time.Millisecond }) @@ -867,12 +868,12 @@ func TestExporterLogs(t *testing.T) { cfgs := map[string]func(*Config){ "async": func(cfg *Config) { - cfg.Batcher.enabledSet = false + cfg.QueueBatchConfig.WaitForResult = false + cfg.QueueBatchConfig.BlockOnOverflow = false }, "sync": func(cfg *Config) { - cfg.Batcher.enabledSet = true - cfg.Batcher.Enabled = true - cfg.Batcher.FlushTimeout = 10 * time.Millisecond + cfg.QueueBatchConfig.WaitForResult = true + cfg.QueueBatchConfig.BlockOnOverflow = false }, } for _, tt := range tableTests { @@ -943,11 +944,12 @@ func TestExporterLogs(t *testing.T) { cfgs := map[string]func(*Config){ "async": func(cfg *Config) { - cfg.Batcher.Enabled = false + cfg.QueueBatchConfig.WaitForResult = false + cfg.QueueBatchConfig.BlockOnOverflow = false }, "sync": func(cfg *Config) { - cfg.Batcher.Enabled = true - cfg.Batcher.FlushTimeout = 10 * time.Millisecond + cfg.QueueBatchConfig.WaitForResult = true + cfg.QueueBatchConfig.BlockOnOverflow = false }, } for _, tt := range tableTests { @@ -1000,11 +1002,16 @@ func TestExporterMetrics(t *testing.T) { exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { cfg.Mapping.Mode = "ecs" }) - dp := pmetric.NewNumberDataPoint() - dp.SetDoubleValue(123.456) - dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - mustSendMetricSumDataPoints(t, exporter, dp) - mustSendMetricGaugeDataPoints(t, exporter, dp) + dpSum := pmetric.NewNumberDataPoint() + dpSum.SetDoubleValue(123.456) + dpSum.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-2 * time.Second))) + mustSendMetricSumDataPoints(t, exporter, dpSum) + + dpGauge := pmetric.NewNumberDataPoint() + dpGauge.SetDoubleValue(123.456) + // Keep timestamp different to avoid metric grouping putting them in same doc + dpGauge.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-1 * time.Second))) + mustSendMetricGaugeDataPoints(t, exporter, dpGauge) rec.WaitItems(2) }) @@ -2435,7 +2442,10 @@ func TestExporter_DynamicMappingMode(t *testing.T) { }) logs := createLogs(tc.scopes...) - exporter := newTestLogsExporter(t, server.URL, setAllowedMappingModes) + exporter := newTestLogsExporter(t, server.URL, setAllowedMappingModes, func(cfg *Config) { + // Set wait_for_result to be true so that errors are reported directly via Consume* + cfg.QueueBatchConfig.WaitForResult = true + }) err := exporter.ConsumeLogs(tc.ctx, logs) if tc.expectErr != "" { require.EqualError(t, err, tc.expectErr) @@ -2460,7 +2470,10 @@ func TestExporter_DynamicMappingMode(t *testing.T) { }) metrics := createMetrics(tc.scopes...) - exporter := newTestMetricsExporter(t, server.URL, setAllowedMappingModes) + exporter := newTestMetricsExporter(t, server.URL, setAllowedMappingModes, func(cfg *Config) { + // Set wait_for_result to be true so that errors are reported directly via Consume* + cfg.QueueBatchConfig.WaitForResult = true + }) err := exporter.ConsumeMetrics(tc.ctx, metrics) if tc.expectErr != "" { require.EqualError(t, err, tc.expectErr) @@ -2478,7 +2491,10 @@ func TestExporter_DynamicMappingMode(t *testing.T) { t.Run("profiles", func(t *testing.T) { // Profiles are only supported by otel mode, so just verify that // the metadata is picked up and invalid modes are rejected. - exporter := newTestProfilesExporter(t, "https://testing.invalid", setAllowedMappingModes) + exporter := newTestProfilesExporter(t, "https://testing.invalid", setAllowedMappingModes, func(cfg *Config) { + // Set wait_for_result to be true so that errors are reported directly via Consume* + cfg.QueueBatchConfig.WaitForResult = true + }) err := exporter.ConsumeProfiles(noneContext, pprofile.NewProfiles()) assert.EqualError(t, err, `Permanent error: invalid context mapping mode: unsupported mapping mode "none", expected one of ["ecs" "otel"]`, @@ -2499,7 +2515,10 @@ func TestExporter_DynamicMappingMode(t *testing.T) { }) traces := createTraces(tc.scopes...) - exporter := newTestTracesExporter(t, server.URL, setAllowedMappingModes) + exporter := newTestTracesExporter(t, server.URL, setAllowedMappingModes, func(cfg *Config) { + // Set wait_for_result to be true so that errors are reported directly via Consume* + cfg.QueueBatchConfig.WaitForResult = true + }) err := exporter.ConsumeTraces(tc.ctx, traces) if tc.expectErr != "" { require.EqualError(t, err, tc.expectErr) @@ -2548,14 +2567,14 @@ func TestExporterBatcher(t *testing.T) { var requests []*http.Request testauthID := component.NewID(component.MustNewType("authtest")) exporter := newUnstartedTestLogsExporter(t, "http://testing.invalid", func(cfg *Config) { - cfg.Batcher = BatcherConfig{ - Enabled: false, - // sync bulk indexer is used without batching + cfg.QueueBatchConfig.Enabled = true + cfg.QueueBatchConfig.WaitForResult = true + cfg.QueueBatchConfig.Batch = configoptional.Some(exporterhelper.BatchConfig{ FlushTimeout: 200 * time.Millisecond, Sizer: exporterhelper.RequestSizerTypeItems, MinSize: 8192, - enabledSet: true, - } + MaxSize: 10000, + }) cfg.Auth = configoptional.Some(configauth.Config{AuthenticatorID: testauthID}) cfg.Retry.Enabled = false }) @@ -2590,8 +2609,9 @@ func newTestTracesExporter(t *testing.T, url string, fns ...func(*Config)) expor f := NewFactory() cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { cfg.Endpoints = []string{url} - cfg.NumWorkers = 1 - cfg.Flush.Interval = 10 * time.Millisecond + cfg.QueueBatchConfig.NumConsumers = 1 + // Batch is configured by default so we can directly edit flush timeout + cfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Millisecond }}, fns...)...) require.NoError(t, xconfmap.Validate(cfg)) exp, err := f.CreateTraces(t.Context(), exportertest.NewNopSettings(metadata.Type), cfg) @@ -2609,8 +2629,9 @@ func newTestProfilesExporter(t *testing.T, url string, fns ...func(*Config)) xex f := NewFactory().(xexporter.Factory) cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { cfg.Endpoints = []string{url} - cfg.NumWorkers = 1 - cfg.Flush.Interval = 10 * time.Millisecond + cfg.QueueBatchConfig.NumConsumers = 1 + // Batch is configured by default so we can directly edit flush timeout + cfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Millisecond }}, fns...)...) require.NoError(t, xconfmap.Validate(cfg)) exp, err := f.CreateProfiles(t.Context(), exportertest.NewNopSettings(metadata.Type), cfg) @@ -2628,8 +2649,9 @@ func newTestMetricsExporter(t *testing.T, url string, fns ...func(*Config)) expo f := NewFactory() cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { cfg.Endpoints = []string{url} - cfg.NumWorkers = 1 - cfg.Flush.Interval = 10 * time.Millisecond + cfg.QueueBatchConfig.NumConsumers = 1 + // Batch is configured by default so we can directly edit flush timeout + cfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Millisecond }}, fns...)...) require.NoError(t, xconfmap.Validate(cfg)) exp, err := f.CreateMetrics(t.Context(), exportertest.NewNopSettings(metadata.Type), cfg) @@ -2657,8 +2679,9 @@ func newUnstartedTestLogsExporter(t *testing.T, url string, fns ...func(*Config) f := NewFactory() cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { cfg.Endpoints = []string{url} - cfg.NumWorkers = 1 - cfg.Flush.Interval = 10 * time.Millisecond + cfg.QueueBatchConfig.NumConsumers = 1 + // Batch is defined as default configuration + cfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Millisecond }}, fns...)...) require.NoError(t, xconfmap.Validate(cfg)) exp, err := f.CreateLogs(t.Context(), exportertest.NewNopSettings(metadata.Type), cfg) @@ -2677,7 +2700,9 @@ func mustSendLogRecords(t *testing.T, exporter exporter.Logs, records ...plog.Lo } func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) { - logs.MarkReadOnly() + if !exporter.Capabilities().MutatesData { + logs.MarkReadOnly() + } err := exporter.ConsumeLogs(t.Context(), logs) require.NoError(t, err) } @@ -2685,10 +2710,10 @@ func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) { func mustSendMetricSumDataPoints(t *testing.T, exporter exporter.Metrics, dataPoints ...pmetric.NumberDataPoint) { metrics := pmetric.NewMetrics() scopeMetrics := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() + metric := scopeMetrics.Metrics().AppendEmpty() + metric.SetEmptySum() + metric.SetName("sum") for _, dataPoint := range dataPoints { - metric := scopeMetrics.Metrics().AppendEmpty() - metric.SetEmptySum() - metric.SetName("sum") dataPoint.CopyTo(metric.Sum().DataPoints().AppendEmpty()) } mustSendMetrics(t, exporter, metrics) @@ -2697,17 +2722,19 @@ func mustSendMetricSumDataPoints(t *testing.T, exporter exporter.Metrics, dataPo func mustSendMetricGaugeDataPoints(t *testing.T, exporter exporter.Metrics, dataPoints ...pmetric.NumberDataPoint) { metrics := pmetric.NewMetrics() scopeMetrics := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() + metric := scopeMetrics.Metrics().AppendEmpty() + metric.SetEmptyGauge() + metric.SetName("gauge") for _, dataPoint := range dataPoints { - metric := scopeMetrics.Metrics().AppendEmpty() - metric.SetEmptyGauge() - metric.SetName("gauge") dataPoint.CopyTo(metric.Gauge().DataPoints().AppendEmpty()) } mustSendMetrics(t, exporter, metrics) } func mustSendMetrics(t *testing.T, exporter exporter.Metrics, metrics pmetric.Metrics) { - metrics.MarkReadOnly() + if !exporter.Capabilities().MutatesData { + metrics.MarkReadOnly() + } err := exporter.ConsumeMetrics(t.Context(), metrics) require.NoError(t, err) } @@ -2723,7 +2750,9 @@ func mustSendSpans(t *testing.T, exporter exporter.Traces, spans ...ptrace.Span) } func mustSendTraces(t *testing.T, exporter exporter.Traces, traces ptrace.Traces) { - traces.MarkReadOnly() + if !exporter.Capabilities().MutatesData { + traces.MarkReadOnly() + } err := exporter.ConsumeTraces(t.Context(), traces) require.NoError(t, err) } diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index fe80832a57ca9..724eb965d6b5f 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -17,7 +17,6 @@ import ( "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configoptional" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper" @@ -26,8 +25,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata" ) -var defaultBatcherMinSizeItems = int64(5000) - // NewFactory creates a factory for Elastic exporter. func NewFactory() exporter.Factory { return xexporter.NewFactory( @@ -42,7 +39,14 @@ func NewFactory() exporter.Factory { func createDefaultConfig() component.Config { qs := exporterhelper.NewDefaultQueueConfig() - qs.Enabled = false + qs.QueueSize = 10 + qs.BlockOnOverflow = true + qs.Batch = configoptional.Some(exporterhelper.BatchConfig{ + FlushTimeout: 10 * time.Second, + MinSize: 1e+6, + MaxSize: 5e+6, + Sizer: exporterhelper.RequestSizerTypeBytes, + }) httpClientConfig := confighttp.NewDefaultClientConfig() httpClientConfig.Timeout = 90 * time.Second @@ -83,15 +87,6 @@ func createDefaultConfig() component.Config { LogFailedDocsInputRateLimit: time.Second, }, IncludeSourceOnError: nil, - Batcher: BatcherConfig{ - FlushTimeout: 10 * time.Second, - Sizer: exporterhelper.RequestSizerTypeItems, - MinSize: defaultBatcherMinSizeItems, - }, - Flush: FlushSettings{ - Bytes: 5e+6, - Interval: 10 * time.Second, - }, } } @@ -220,53 +215,13 @@ func exporterhelperOptions( shutdown component.ShutdownFunc, qbs xexporterhelper.QueueBatchSettings, ) []exporterhelper.Option { - opts := []exporterhelper.Option{ - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + // not setting capabilities as they will default to non-mutating but will be updated + // by the base-exporter to mutating if batching is enabled. + return []exporterhelper.Option{ exporterhelper.WithStart(start), exporterhelper.WithShutdown(shutdown), - } - qbc := cfg.QueueBatchConfig - switch { - case qbc.Batch.HasValue(): - // Latest queue batch settings are used, prioritize them even if sending queue is disabled - opts = append(opts, xexporterhelper.WithQueueBatch(qbc, qbs)) - + xexporterhelper.WithQueueBatch(cfg.QueueBatchConfig, qbs), // Effectively disable timeout_sender because timeout is enforced in bulk indexer. - // - // We keep timeout_sender enabled in the async mode (sending_queue not enabled OR sending - // queue enabled but batching not enabled OR based on the deprecated batcher setting), to - // ensure sending data to the background workers will not block indefinitely. - if qbc.Enabled { - opts = append(opts, exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0})) - } - case cfg.Batcher.enabledSet: - if cfg.Batcher.Enabled { - qbc.Batch = configoptional.Some(exporterhelper.BatchConfig{ - FlushTimeout: cfg.Batcher.FlushTimeout, - MinSize: cfg.Batcher.MinSize, - MaxSize: cfg.Batcher.MaxSize, - Sizer: cfg.Batcher.Sizer, - }) - - // If the deprecated batcher is enabled without a queue, enable blocking queue to replicate the - // behavior of the deprecated batcher. - if !qbc.Enabled { - qbc.Enabled = true - qbc.WaitForResult = true - } - } - - opts = append( - opts, - // Effectively disable timeout_sender because timeout is enforced in bulk indexer. - // - // We keep timeout_sender enabled in the async mode (Batcher.Enabled == nil), - // to ensure sending data to the background workers will not block indefinitely. - exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), - exporterhelper.WithQueue(qbc), - ) - default: - opts = append(opts, exporterhelper.WithQueue(qbc)) + exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), } - return opts } diff --git a/exporter/elasticsearchexporter/factory_test.go b/exporter/elasticsearchexporter/factory_test.go index 29dd9d1293d1e..70625599fb1b8 100644 --- a/exporter/elasticsearchexporter/factory_test.go +++ b/exporter/elasticsearchexporter/factory_test.go @@ -116,17 +116,6 @@ var signalTestCases = []struct { "endpoints": []string{"http://test:9200"}, }, }, - { - name: "with_deprecated_batcher", - cfg: map[string]any{ - "batcher": map[string]any{ - "enabled": true, - }, - }, - expectedLogs: []string{ - "batcher has been deprecated", - }, - }, { name: "with_sending_queue", cfg: map[string]any{ @@ -136,33 +125,4 @@ var signalTestCases = []struct { }, }, }, - { - name: "with_sending_queue_disabled_and_deprecated_batcher", - cfg: map[string]any{ - "sending_queue": map[string]any{ - "batch": map[string]any{}, - }, - "batcher": map[string]any{ - "enabled": true, - }, - }, - expectedLogs: []string{ - "sending_queue::batch will take preference", - }, - }, - { - name: "with_sending_queue_enabled_and_deprecated_batcher", - cfg: map[string]any{ - "sending_queue": map[string]any{ - "enabled": true, - "batch": map[string]any{}, - }, - "batcher": map[string]any{ - "enabled": true, - }, - }, - expectedLogs: []string{ - "sending_queue::batch will take preference", - }, - }, } diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index 5e26849631901..fdca63ae72277 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -26,7 +26,6 @@ require ( go.opentelemetry.io/collector/config/configoptional v1.43.0 go.opentelemetry.io/collector/confmap v1.43.0 go.opentelemetry.io/collector/confmap/xconfmap v0.137.0 - go.opentelemetry.io/collector/consumer v1.43.0 go.opentelemetry.io/collector/consumer/consumererror v0.137.0 go.opentelemetry.io/collector/exporter v1.43.0 go.opentelemetry.io/collector/exporter/exporterhelper v0.137.0 @@ -88,6 +87,7 @@ require ( go.opentelemetry.io/collector/config/configmiddleware v1.43.0 // indirect go.opentelemetry.io/collector/config/configretry v1.43.0 // indirect go.opentelemetry.io/collector/config/configtls v1.43.0 // indirect + go.opentelemetry.io/collector/consumer v1.43.0 // indirect go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.137.0 // indirect go.opentelemetry.io/collector/consumer/consumertest v0.137.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.137.0 // indirect diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index e186f8175faa6..89cbe02933edf 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -67,7 +67,7 @@ type esDataReceiver struct { receiver receiver.Logs endpoint string decodeBulkRequest bool - batcherEnabled *bool + enableBatching bool t testing.TB } @@ -92,9 +92,9 @@ func withDecodeBulkRequest(decode bool) dataReceiverOption { } } -func withBatcherEnabled(enabled bool) dataReceiverOption { +func withBatching(enabled bool) dataReceiverOption { return func(r *esDataReceiver) { - r.batcherEnabled = &enabled + r.enableBatching = enabled } } @@ -153,9 +153,6 @@ func (es *esDataReceiver) GenConfigYAMLStr() string { logs_index: %s metrics_index: %s traces_index: %s - sending_queue: - enabled: true - block_on_overflow: true mapping: mode: otel retry: @@ -169,17 +166,24 @@ func (es *esDataReceiver) GenConfigYAMLStr() string { es.endpoint, TestLogsIndex, TestMetricsIndex, TestTracesIndex, ) - if es.batcherEnabled == nil { + if es.enableBatching { cfgFormat += ` - flush: - interval: 1s` + sending_queue: + enabled: true + block_on_overflow: true + batch: + flush_timeout: 1s + sizer: bytes` } else { - cfgFormat += fmt.Sprintf(` - batcher: - flush_timeout: 1s - enabled: %v`, - *es.batcherEnabled, - ) + // Batching is disabled using `min_size` as we are setting batching + // as a default behavior. + cfgFormat += ` + sending_queue: + enabled: true + block_on_overflow: true + batch: + min_size: 0 + sizer: bytes` } return cfgFormat + "\n" } diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 4842cefa6bc84..f47a4ccf30c48 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -171,8 +171,9 @@ func prepareBenchmark( cfg.esCfg.LogsIndex = TestLogsIndex cfg.esCfg.MetricsIndex = TestMetricsIndex cfg.esCfg.TracesIndex = TestTracesIndex - cfg.esCfg.Flush.Interval = 10 * time.Millisecond - cfg.esCfg.NumWorkers = 1 + // sending_queue::batch is defined as a default config + cfg.esCfg.QueueBatchConfig.Batch.Get().FlushTimeout = 10 * time.Millisecond + cfg.esCfg.QueueBatchConfig.NumConsumers = 1 tc, err := consumer.NewTraces(func(context.Context, ptrace.Traces) error { return nil diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_test.go index ec02955ca79fc..82e1917b0d154 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_test.go @@ -22,11 +22,7 @@ func TestExporter(t *testing.T) { for _, tc := range []struct { name string - // batcherEnabled enables/disables the batch sender. If this is - // nil, then the exporter buffers data itself (legacy behavior), - // whereas if it is non-nil then the exporter will not perform - // any buffering itself. - batcherEnabled *bool + enableBatching bool // restartCollector restarts the OTEL collector. Restarting // the collector allows durability testing of the ES exporter @@ -38,12 +34,12 @@ func TestExporter(t *testing.T) { {name: "es_intermittent_http_error", mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}}, {name: "es_intermittent_doc_error", mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}}, - {name: "batcher_enabled", batcherEnabled: ptrTo(true)}, - {name: "batcher_enabled_es_intermittent_http_error", batcherEnabled: ptrTo(true), mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}}, - {name: "batcher_enabled_es_intermittent_doc_error", batcherEnabled: ptrTo(true), mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}}, - {name: "batcher_disabled", batcherEnabled: ptrTo(false)}, - {name: "batcher_disabled_es_intermittent_http_error", batcherEnabled: ptrTo(false), mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}}, - {name: "batcher_disabled_es_intermittent_doc_error", batcherEnabled: ptrTo(false), mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}}, + {name: "enable sending_queue batching", enableBatching: true}, + {name: "batcher_enabled_es_intermittent_http_error", enableBatching: true, mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}}, + {name: "batcher_enabled_es_intermittent_doc_error", enableBatching: true, mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}}, + {name: "batcher_disabled", enableBatching: false}, + {name: "batcher_disabled_es_intermittent_http_error", enableBatching: false, mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}}, + {name: "batcher_disabled_es_intermittent_doc_error", enableBatching: false, mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}}, /* TODO: Below tests should be enabled after https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30792 is fixed {name: "collector_restarts", restartCollector: true}, @@ -51,11 +47,7 @@ func TestExporter(t *testing.T) { */ } { t.Run(fmt.Sprintf("%s/%s", eventType, tc.name), func(t *testing.T) { - var opts []dataReceiverOption - if tc.batcherEnabled != nil { - opts = append(opts, withBatcherEnabled(*tc.batcherEnabled)) - } - runner(t, eventType, tc.restartCollector, tc.mockESErr, opts...) + runner(t, eventType, tc.restartCollector, tc.mockESErr, withBatching(tc.enableBatching)) }) } } @@ -156,7 +148,3 @@ func runner(t *testing.T, eventType string, restartCollector bool, mockESErr err ) tc.ValidateData() } - -func ptrTo[T any](t T) *T { - return &t -} diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index 9235f670ec048..f79ae0c8fe556 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -20,8 +20,6 @@ elasticsearch/trace: api_key: AvFsEiPs== discover: on_start: true - flush: - bytes: 10485760 retry: max_retries: 5 retry_on_status: @@ -47,15 +45,11 @@ elasticsearch/metric: api_key: AvFsEiPs== discover: on_start: true - flush: - bytes: 10485760 retry: max_retries: 5 retry_on_status: - 429 - 500 - sending_queue: - enabled: true elasticsearch/log: tls: insecure: false @@ -76,15 +70,11 @@ elasticsearch/log: api_key: AvFsEiPs== discover: on_start: true - flush: - bytes: 10485760 retry: max_retries: 5 retry_on_status: - 429 - 500 - sending_queue: - enabled: true elasticsearch/logstash_format: endpoints: [http://localhost:9200] logstash_format: @@ -97,21 +87,12 @@ elasticsearch/cloudid: cloudid: foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY= elasticsearch/confighttp_endpoint: endpoint: https://elastic.example.com:9200 -elasticsearch/batcher_disabled: - endpoint: https://elastic.example.com:9200 - batcher: - enabled: false elasticsearch/compression_none: endpoint: https://elastic.example.com:9200 compression: none elasticsearch/compression_gzip: endpoint: https://elastic.example.com:9200 compression: gzip -elasticsearch/batcher_minmax_size: - endpoint: https://elastic.example.com:9200 - batcher: - min_size: 100 - max_size: 200 elasticsearch/include_source_on_error: endpoint: https://elastic.example.com:9200 include_source_on_error: true @@ -120,11 +101,50 @@ elasticsearch/metadata_keys: metadata_keys: - x-test-1 - x-test-2 -elasticsearch/queuebatch_enabled: +elasticsearch/sendingqueue_disabled: + endpoint: https://elastic.example.com:9200 + # Setting sending queue without `batch::sizer` can have unintended + # side effects. See https://github.com/open-telemetry/opentelemetry-collector/issues/13860 + sending_queue: + enabled: false + batch: + sizer: bytes +elasticsearch/sendingqueue_enabled: endpoint: https://elastic.example.com:9200 sending_queue: enabled: true - sizer: "items" + sizer: requests num_consumers: 100 batch: - flush_timeout: 10s + flush_timeout: 1s + sizer: items + min_size: 1000 + max_size: 5000 +elasticsearch/backward_compat_for_deprecated_cfgs/new_config_takes_priority: + endpoint: https://elastic.example.com:9200 + # Should be ignored and left as-is + num_workers: 11 + flush: + interval: 11s + bytes: 1001 + # Should take precedence + sending_queue: + enabled: true + sizer: requests + num_consumers: 111 + batch: + flush_timeout: 111s + max_size: 1_000_001 + sizer: bytes +elasticsearch/backward_compat_for_deprecated_cfgs/fallback_to_old_cfg: + endpoint: https://elastic.example.com:9200 + # Should be used to set sending_queue config + num_workers: 11 + flush: + interval: 11s + bytes: 1_000_001 + sending_queue: + enabled: true + sizer: requests + batch: + sizer: bytes