diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/RetryFailedReindexIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/RetryFailedReindexIT.java new file mode 100644 index 0000000000000..053767a6466b2 --- /dev/null +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/RetryFailedReindexIT.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.reindex; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilterChain; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.Collections.singletonList; +import static org.elasticsearch.action.DocWriteRequest.OpType.CREATE; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.not; + +/** + * Tests retrying a failed reindex operation + */ +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) +public class RetryFailedReindexIT extends ESIntegTestCase { + private static final String INDEX = "source-index"; + private static final String DEST_INDEX = "dest-index"; + private static final int NUM_DOCS = 100; + private static final int NUM_PARTIAL_DOCS = 70; + private static final AtomicBoolean FILTER_ENABLED = new AtomicBoolean(false); + private static final AtomicInteger DOC_COUNT = new AtomicInteger(0); + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(ReindexPlugin.class, TestPlugin.class); + } + + @Before + public void reset() { + FILTER_ENABLED.set(false); + DOC_COUNT.set(0); + } + + public void testRetryFailedReindex() throws Exception { + createIndex(INDEX); + indexRandom( + true, + false, + true, + IntStream.range(0, NUM_DOCS) + .mapToObj(i -> prepareIndex(INDEX).setId(Integer.toString(i)).setSource("n", Integer.toString(i))) + .collect(Collectors.toList()) + ); + assertHitCount(prepareSearch(INDEX).setSize(0).setTrackTotalHits(true), NUM_DOCS); + + // Fail reindex and end up in partial state + FILTER_ENABLED.set(true); + assertFutureThrows(reindex(true), TestException.class); + FILTER_ENABLED.set(false); + + // Run into conflicts with partial destination index + assertResponse(reindex(true), res -> { + assertThat(res.getBulkFailures(), not(empty())); + for (BulkItemResponse.Failure failure : res.getBulkFailures()) { + assertThat(failure.getMessage(), containsString("VersionConflictEngineException: [")); + } + }); + + // Bypass conflicts and complete reindex + assertResponse(reindex(false), res -> { assertThat(res.getBulkFailures(), empty()); }); + assertBusy(() -> { assertHitCount(prepareSearch(DEST_INDEX).setSize(0).setTrackTotalHits(true), NUM_DOCS); }); + } + + private ActionFuture reindex(boolean abortOnVersionConflict) { + ReindexRequestBuilder builder = new ReindexRequestBuilder(internalCluster().client()); + builder.source(INDEX).destination(DEST_INDEX).abortOnVersionConflict(abortOnVersionConflict); + builder.source().setSize(1); + builder.destination().setOpType(CREATE); + return builder.execute(); + } + + private static class TestException extends ElasticsearchException { + TestException() { + super("Injected index failure"); + } + } + + public static class TestPlugin extends Plugin implements ActionPlugin { + @Override + public List getActionFilters() { + return singletonList(new ActionFilter() { + @Override + public int order() { + return Integer.MIN_VALUE; + } + + @Override + public void apply( + Task task, + String action, + Request request, + ActionListener listener, + ActionFilterChain chain + ) { + if (FILTER_ENABLED.get() + && action.equals("indices:data/write/bulk") + && DOC_COUNT.incrementAndGet() > NUM_PARTIAL_DOCS) { + listener.onFailure(new TestException()); + } else { + chain.proceed(task, action, request, listener); + } + } + + }); + } + } +}