diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/FailureStoreMetricsWithIncrementalBulkIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/FailureStoreMetricsWithIncrementalBulkIT.java new file mode 100644 index 0000000000000..2c9b7417b2832 --- /dev/null +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/FailureStoreMetricsWithIncrementalBulkIT.java @@ -0,0 +1,251 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams; + +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.FailureStoreMetrics; +import org.elasticsearch.action.bulk.IncrementalBulkService; +import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus; +import org.elasticsearch.action.datastreams.CreateDataStreamAction; +import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Strings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.TestTelemetryPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo; +import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +public class FailureStoreMetricsWithIncrementalBulkIT extends ESIntegTestCase { + + private static final List METRICS = List.of( + FailureStoreMetrics.METRIC_TOTAL, + FailureStoreMetrics.METRIC_FAILURE_STORE, + FailureStoreMetrics.METRIC_REJECTED + ); + + private static final String DATA_STREAM_NAME = "data-stream-incremental"; + + @Override + protected Collection> nodePlugins() { + return List.of(DataStreamsPlugin.class, TestTelemetryPlugin.class, MapperExtrasPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), "512B") + .put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), "2048B") + .put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK.getKey(), "2KB") + .put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK_SIZE.getKey(), "1024B") + .build(); + } + + public void testShortCircuitFailure() throws Exception { + createDataStreamWithFailureStore(); + + String coordinatingOnlyNode = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + + AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); + IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, coordinatingOnlyNode); + try (IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest()) { + + AtomicBoolean nextRequested = new AtomicBoolean(true); + int successfullyStored = 0; + while (nextRequested.get()) { + nextRequested.set(false); + refCounted.incRef(); + handler.addItems(List.of(indexRequest(DATA_STREAM_NAME)), refCounted::decRef, () -> nextRequested.set(true)); + successfullyStored++; + } + assertBusy(() -> assertTrue(nextRequested.get())); + var metrics = collectTelemetry(); + assertDataStreamMetric(metrics, FailureStoreMetrics.METRIC_TOTAL, DATA_STREAM_NAME, successfullyStored); + assertDataStreamMetric(metrics, FailureStoreMetrics.METRIC_FAILURE_STORE, DATA_STREAM_NAME, 0); + assertDataStreamMetric(metrics, FailureStoreMetrics.METRIC_REJECTED, DATA_STREAM_NAME, 0); + + // Introduce artificial pressure that will reject the following requests + String node = findNodeOfPrimaryShard(DATA_STREAM_NAME); + IndexingPressure primaryPressure = internalCluster().getInstance(IndexingPressure.class, node); + long memoryLimit = primaryPressure.stats().getMemoryLimit(); + long primaryRejections = primaryPressure.stats().getPrimaryRejections(); + try (Releasable ignored = primaryPressure.markPrimaryOperationStarted(10, memoryLimit, false)) { + while (primaryPressure.stats().getPrimaryRejections() == primaryRejections) { + while (nextRequested.get()) { + nextRequested.set(false); + refCounted.incRef(); + List> requests = new ArrayList<>(); + for (int i = 0; i < 20; ++i) { + requests.add(indexRequest(DATA_STREAM_NAME)); + } + handler.addItems(requests, refCounted::decRef, () -> nextRequested.set(true)); + } + assertBusy(() -> assertTrue(nextRequested.get())); + } + } + + while (nextRequested.get()) { + nextRequested.set(false); + refCounted.incRef(); + handler.addItems(List.of(indexRequest(DATA_STREAM_NAME)), refCounted::decRef, () -> nextRequested.set(true)); + } + + assertBusy(() -> assertTrue(nextRequested.get())); + + PlainActionFuture future = new PlainActionFuture<>(); + handler.lastItems(List.of(indexRequest(DATA_STREAM_NAME)), refCounted::decRef, future); + + BulkResponse bulkResponse = safeGet(future); + + for (int i = 0; i < bulkResponse.getItems().length; ++i) { + // the first requests were successful + boolean hasFailed = i >= successfullyStored; + assertThat(bulkResponse.getItems()[i].isFailed(), is(hasFailed)); + assertThat(bulkResponse.getItems()[i].getFailureStoreStatus(), is(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN)); + } + + metrics = collectTelemetry(); + assertDataStreamMetric(metrics, FailureStoreMetrics.METRIC_TOTAL, DATA_STREAM_NAME, bulkResponse.getItems().length); + assertDataStreamMetric( + metrics, + FailureStoreMetrics.METRIC_REJECTED, + DATA_STREAM_NAME, + bulkResponse.getItems().length - successfullyStored + ); + assertDataStreamMetric(metrics, FailureStoreMetrics.METRIC_FAILURE_STORE, DATA_STREAM_NAME, 0); + } + } + + private void createDataStreamWithFailureStore() throws IOException { + TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request( + "template-incremental" + ); + request.indexTemplate( + ComposableIndexTemplate.builder() + .indexPatterns(List.of(DATA_STREAM_NAME + "*")) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, true)) + .template(new Template(null, new CompressedXContent(""" + { + "dynamic": false, + "properties": { + "@timestamp": { + "type": "date" + }, + "count": { + "type": "long" + } + } + }"""), null)) + .build() + ); + assertAcked(safeGet(client().execute(TransportPutComposableIndexTemplateAction.TYPE, request))); + + final var createDataStreamRequest = new CreateDataStreamAction.Request( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + DATA_STREAM_NAME + ); + assertAcked(safeGet(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest))); + } + + private static Map> collectTelemetry() { + Map> measurements = new HashMap<>(); + for (PluginsService pluginsService : internalCluster().getInstances(PluginsService.class)) { + final TestTelemetryPlugin telemetryPlugin = pluginsService.filterPlugins(TestTelemetryPlugin.class).findFirst().orElseThrow(); + + telemetryPlugin.collect(); + + for (String metricName : METRICS) { + measurements.put(metricName, telemetryPlugin.getLongCounterMeasurement(metricName)); + } + } + return measurements; + } + + private void assertDataStreamMetric(Map> metrics, String metric, String dataStreamName, int expectedValue) { + List measurements = metrics.get(metric); + assertThat(measurements, notNullValue()); + long totalValue = measurements.stream() + .filter(m -> m.attributes().get("data_stream").equals(dataStreamName)) + .mapToLong(Measurement::getLong) + .sum(); + assertThat(totalValue, equalTo((long) expectedValue)); + } + + private static IndexRequest indexRequest(String dataStreamName) { + String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); + String value = "1"; + return new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE) + .source(Strings.format("{\"%s\":\"%s\", \"count\": %s}", DEFAULT_TIMESTAMP_FIELD, time, value), XContentType.JSON); + } + + protected static String findNodeOfPrimaryShard(String dataStreamName) { + GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( + TEST_REQUEST_TIMEOUT, + new String[] { dataStreamName } + ); + GetDataStreamAction.Response getDataStreamResponse = safeGet(client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)); + assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); + DataStream dataStream = getDataStreamResponse.getDataStreams().getFirst().getDataStream(); + assertThat(dataStream.getName(), equalTo(DATA_STREAM_NAME)); + assertThat(dataStream.getIndices().size(), equalTo(1)); + String backingIndex = dataStream.getIndices().getFirst().getName(); + assertThat(backingIndex, backingIndexEqualTo(DATA_STREAM_NAME, 1)); + + Index index = resolveIndex(backingIndex); + int shardId = 0; + for (String node : internalCluster().getNodeNames()) { + var indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexService(index); + if (indexService != null) { + IndexShard shard = indexService.getShardOrNull(shardId); + if (shard != null && shard.isActive() && shard.routingEntry().primary()) { + return node; + } + } + } + throw new AssertionError("IndexShard instance not found for shard " + new ShardId(index, shardId)); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index ce3e189149451..ad1fda2534fab 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; @@ -543,7 +544,8 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques var isFailureStoreRequest = isFailureStoreRequest(docWriteRequest); if (isFailureStoreRequest == false && failureStoreCandidate.isFailureStoreEnabled() - && error instanceof VersionConflictEngineException == false) { + && error instanceof VersionConflictEngineException == false + && error instanceof EsRejectedExecutionException == false) { // Prepare the data stream failure store if necessary maybeMarkFailureStoreForRollover(failureStoreCandidate); @@ -563,8 +565,8 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques } } else { // If we can't redirect to a failure store (because either the data stream doesn't have the failure store enabled - // or this request was already targeting a failure store), or this was a version conflict we increment the - // rejected counter. + // or this request was already targeting a failure store), or this was an error that is not eligible for the failure store + // such as a version conflict or a load rejection we increment the rejected counter. failureStoreMetrics.incrementRejected( bulkItemRequest.index(), errorType,