Skip to content
Merged
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.datastreams;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.FailureStoreMetrics;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class FailureStoreMetricsWithIncrementalBulkIT extends ESIntegTestCase {

private static final List<String> METRICS = List.of(
FailureStoreMetrics.METRIC_TOTAL,
FailureStoreMetrics.METRIC_FAILURE_STORE,
FailureStoreMetrics.METRIC_REJECTED
);

private String dataStream = "data-stream-incremental";
private String template = "template-incremental";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we convert them these to constants? They look like they are.


@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class, TestTelemetryPlugin.class, MapperExtrasPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), "512B")
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), "2048B")
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK.getKey(), "2KB")
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK_SIZE.getKey(), "1024B")
.build();
}

public void testShortCircuitFailure() throws Exception {
putComposableIndexTemplate(true);
createDataStream();

String coordinatingOnlyNode = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, coordinatingOnlyNode);
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();

AtomicBoolean nextRequested = new AtomicBoolean(true);
AtomicLong hits = new AtomicLong(0);
while (nextRequested.get()) {
nextRequested.set(false);
refCounted.incRef();
handler.addItems(List.of(indexRequest(dataStream)), refCounted::decRef, () -> nextRequested.set(true));
hits.incrementAndGet();
}
assertBusy(() -> assertTrue(nextRequested.get()));
var measurements = collectTelemetry();
assertMeasurements(measurements.get(FailureStoreMetrics.METRIC_TOTAL), (int) hits.get(), dataStream);
assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_FAILURE_STORE).size());
assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_REJECTED).size());

GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" });
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStream));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().size(), equalTo(1));
String backingIndex = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().get(0).getName();
assertThat(backingIndex, backingIndexEqualTo(dataStream, 1));

String node = findShard(resolveIndex(backingIndex), 0);
IndexingPressure primaryPressure = internalCluster().getInstance(IndexingPressure.class, node);
long memoryLimit = primaryPressure.stats().getMemoryLimit();
long primaryRejections = primaryPressure.stats().getPrimaryRejections();
try (Releasable releasable = primaryPressure.markPrimaryOperationStarted(10, memoryLimit, false)) {
while (primaryPressure.stats().getPrimaryRejections() == primaryRejections) {
while (nextRequested.get()) {
nextRequested.set(false);
refCounted.incRef();
List<DocWriteRequest<?>> requests = new ArrayList<>();
for (int i = 0; i < 20; ++i) {
requests.add(indexRequest(dataStream));
}
handler.addItems(requests, refCounted::decRef, () -> nextRequested.set(true));
}
assertBusy(() -> assertTrue(nextRequested.get()));
}
}

while (nextRequested.get()) {
nextRequested.set(false);
refCounted.incRef();
handler.addItems(List.of(indexRequest(dataStream)), refCounted::decRef, () -> nextRequested.set(true));
}

assertBusy(() -> assertTrue(nextRequested.get()));

PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
handler.lastItems(List.of(indexRequest(dataStream)), refCounted::decRef, future);

BulkResponse bulkResponse = safeGet(future);

/* The datastream should have a failure store associated with it now */
getDataStreamRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" });
getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest).actionGet();
Index failureStoreIndex = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureStoreWriteIndex();
String failureStoreIndexName = failureStoreIndex.getName();
String failure_node = findShard(resolveIndex(failureStoreIndexName), 0);
boolean shardsOnDifferentNodes = node.equals(failure_node) == false;

for (int i = 0; i < hits.get(); ++i) {
assertFalse(bulkResponse.getItems()[i].isFailed());
assertTrue(bulkResponse.getItems()[i].getFailureStoreStatus().getLabel().equalsIgnoreCase("NOT_APPLICABLE_OR_UNKNOWN"));
}

int docs_redirected_to_fs = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java variables should be camel case. docsRedirectedToFs

int docs_in_fs = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java variables should be camel case. docsInFS

Copy link
Contributor Author

@ankikuma ankikuma Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Done.

for (int i = (int) hits.get(); i < bulkResponse.getItems().length; ++i) {
BulkItemResponse item = bulkResponse.getItems()[i];
if (item.isFailed()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I understand this correctly:

If an indexing operation fails and there is a failure store configured will then try to store that failure in the failure store. If that operation succeeds, then the indexing operations is indicated to the user as "successful"?

As long as that is the expected behavior this mitigation looks good to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes your understanding is correct @Tim-Brooks. If we successfully index into the failure store, the indexing operation is considered successful, even though the operation failed to index into the original index.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ankikuma & @Tim-Brooks ,

As long as that is the expected behavior this mitigation looks good to

I do not think this is expected behaviour.

I am afraid I mislead @ankikuma during our last chat.

If we successfully index into the failure store, the indexing operation is considered successful, even though the operation failed to index into the original index.

This is correct, and it explains the behaviour that the code exhibits. But if I remember correctly the conversations we looked into, the purpose of the failure store is to store failures as a result from a user misconfiguration and not technical limitations/failures.

Considering this, I would expect that this type of failure should not be redirected to the failure store and it should result in a failed response.

As a way forward, I see two options depending on the scope of this work:

  • If the purpose of this test is to check that incremental indexing and failure store work as expected, I would say that we need to fix the bug that this test has unearthed.
  • If the purpose is to only add a test to cover the failure store and incremental bulk indexing working together, we should write the test to work as expected and then open a bug and mute this test.

Preferably, I would prefer the first but I do not fully know the scope of this work, so I would like to offer an alternative as well.

Copy link
Contributor Author

@ankikuma ankikuma Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gmarouli The scope of this test was to just test the interaction of incremental bulk failures caused by the short circuit feature with failure store, based on the comment from @nielsbauman here. And in this original comment , it looks like we expected the short circuited requests to got to the failure store.

However, as this quote of James Baiera indicated that if failures occurred due to resource constraints, they should not go to the failure store:

I think if we reject requests due to resource constraints that’s ok, since the failure store is not meant to be a dead letter queue - it’s a best effort storage location for documents that cannot be ingested because there is some kind of fault in their shape or content.
For instance, if the failure store index on a data stream is not allocated, we simply reject the document, nothing to be done. If there’s no memory to execute a write, or if there is no thread capacity, there’s nothing we can do

Now a short circuit failure is triggered due to a previous failure. It just so happens that in this test we simulate that failure based on indexing pressure. I am not sure how one would distinguish between failures caused by resource constraints vs. other types of failures (for the benefit of the failure store).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not mistaken the exception thrown is EsRejectedExecutionException right? In this case we can extend the conditions at the point where we determine if a request should be redirected to the failure store or not. See:

if (isFailureStoreRequest == false
&& failureStoreCandidate.isFailureStoreEnabled()
&& error instanceof VersionConflictEngineException == false) {

If you agree I think it's worth the effort because right now the assertions are much more complex than they need to be. I could also give it a go if you want and we can ask @jbaiera to review. Would you feel more comfortable with that approach?

assertFalse(shardsOnDifferentNodes);
assertThat(item.getFailure().getCause().getCause(), instanceOf(EsRejectedExecutionException.class));
assertTrue(item.getFailureStoreStatus().getLabel().equalsIgnoreCase("FAILED"));
docs_redirected_to_fs++;
} else {
assertTrue(shardsOnDifferentNodes);
assertTrue(item.getFailureStoreStatus().getLabel().equalsIgnoreCase("USED"));
docs_in_fs++;
}
}
measurements = collectTelemetry();
assertMeasurements(measurements.get(FailureStoreMetrics.METRIC_TOTAL), bulkResponse.getItems().length, dataStream);
assertEquals(bulkResponse.getItems().length - hits.get(), measurements.get(FailureStoreMetrics.METRIC_FAILURE_STORE).size());
assertEquals(docs_redirected_to_fs, measurements.get(FailureStoreMetrics.METRIC_REJECTED).size());
}

private void createDataStream() {
final var createDataStreamRequest = new CreateDataStreamAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, dataStream);
assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet());
}

private static Map<String, List<Measurement>> collectTelemetry() {
Map<String, List<Measurement>> measurements = new HashMap<>();
for (PluginsService pluginsService : internalCluster().getInstances(PluginsService.class)) {
final TestTelemetryPlugin telemetryPlugin = pluginsService.filterPlugins(TestTelemetryPlugin.class).findFirst().orElseThrow();

telemetryPlugin.collect();

for (String metricName : METRICS) {
measurements.put(metricName, telemetryPlugin.getLongCounterMeasurement(metricName));
}
}
return measurements;
}

private void assertMeasurements(List<Measurement> measurements, int expectedSize, String expectedDataStream) {
assertMeasurements(measurements, expectedSize, expectedDataStream, (Consumer<Measurement>) null);
}

private void assertMeasurements(
List<Measurement> measurements,
int expectedSize,
String expectedDataStream,
FailureStoreMetrics.ErrorLocation location
) {
assertMeasurements(
measurements,
expectedSize,
expectedDataStream,
measurement -> assertEquals(location.name(), measurement.attributes().get("error_location"))
);
}

private void assertMeasurements(
List<Measurement> measurements,
int expectedSize,
String expectedDataStream,
Consumer<Measurement> customAssertion
) {
assertEquals(expectedSize, measurements.size());
for (Measurement measurement : measurements) {
assertEquals(expectedDataStream, measurement.attributes().get("data_stream"));
if (customAssertion != null) {
customAssertion.accept(measurement);
}
}
}

private static IndexRequest indexRequest(String dataStream) {
String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
String value = "1";
IndexRequest indexRequest = new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE)
.source(Strings.format("{\"%s\":\"%s\", \"count\": %s}", DEFAULT_TIMESTAMP_FIELD, time, value), XContentType.JSON);
return indexRequest;
}

private void putComposableIndexTemplate(boolean failureStore) throws IOException {
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(template);
request.indexTemplate(
ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStream + "*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, failureStore))
.template(new Template(null, new CompressedXContent("""
{
"dynamic": false,
"properties": {
"@timestamp": {
"type": "date"
},
"count": {
"type": "long"
}
}
}"""), null))
.build()
);
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
}

protected static String findShard(Index index, int shardId) {
for (String node : internalCluster().getNodeNames()) {
var indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService(index);
if (indexService != null) {
IndexShard shard = indexService.getShardOrNull(shardId);
if (shard != null && shard.isActive() && shard.routingEntry().primary()) {
return node;
}
}
}
throw new AssertionError("IndexShard instance not found for shard " + new ShardId(index, shardId));
}
}