Skip to content

Commit 3adfae3

Browse files
jxie-1elasticsearchmachine
andauthored
Add IT to retry failed reindex operation (#132681)
* Add IT to retry failed reindex operation * [CI] Auto commit changes from spotless --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent b28801a commit 3adfae3

File tree

1 file changed

+140
-0
lines changed

1 file changed

+140
-0
lines changed
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.reindex;
11+
12+
import org.elasticsearch.ElasticsearchException;
13+
import org.elasticsearch.action.ActionFuture;
14+
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.ActionRequest;
16+
import org.elasticsearch.action.ActionResponse;
17+
import org.elasticsearch.action.bulk.BulkItemResponse;
18+
import org.elasticsearch.action.support.ActionFilter;
19+
import org.elasticsearch.action.support.ActionFilterChain;
20+
import org.elasticsearch.plugins.ActionPlugin;
21+
import org.elasticsearch.plugins.Plugin;
22+
import org.elasticsearch.reindex.ReindexPlugin;
23+
import org.elasticsearch.tasks.Task;
24+
import org.elasticsearch.test.ESIntegTestCase;
25+
import org.junit.Before;
26+
27+
import java.util.Arrays;
28+
import java.util.Collection;
29+
import java.util.List;
30+
import java.util.concurrent.atomic.AtomicBoolean;
31+
import java.util.concurrent.atomic.AtomicInteger;
32+
import java.util.stream.Collectors;
33+
import java.util.stream.IntStream;
34+
35+
import static java.util.Collections.singletonList;
36+
import static org.elasticsearch.action.DocWriteRequest.OpType.CREATE;
37+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
38+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
39+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
40+
import static org.hamcrest.Matchers.containsString;
41+
import static org.hamcrest.Matchers.empty;
42+
import static org.hamcrest.Matchers.not;
43+
44+
/**
45+
* Tests retrying a failed reindex operation
46+
*/
47+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
48+
public class RetryFailedReindexIT extends ESIntegTestCase {
49+
private static final String INDEX = "source-index";
50+
private static final String DEST_INDEX = "dest-index";
51+
private static final int NUM_DOCS = 100;
52+
private static final int NUM_PARTIAL_DOCS = 70;
53+
private static final AtomicBoolean FILTER_ENABLED = new AtomicBoolean(false);
54+
private static final AtomicInteger DOC_COUNT = new AtomicInteger(0);
55+
56+
@Override
57+
protected Collection<Class<? extends Plugin>> nodePlugins() {
58+
return Arrays.asList(ReindexPlugin.class, TestPlugin.class);
59+
}
60+
61+
@Before
62+
public void reset() {
63+
FILTER_ENABLED.set(false);
64+
DOC_COUNT.set(0);
65+
}
66+
67+
public void testRetryFailedReindex() throws Exception {
68+
createIndex(INDEX);
69+
indexRandom(
70+
true,
71+
false,
72+
true,
73+
IntStream.range(0, NUM_DOCS)
74+
.mapToObj(i -> prepareIndex(INDEX).setId(Integer.toString(i)).setSource("n", Integer.toString(i)))
75+
.collect(Collectors.toList())
76+
);
77+
assertHitCount(prepareSearch(INDEX).setSize(0).setTrackTotalHits(true), NUM_DOCS);
78+
79+
// Fail reindex and end up in partial state
80+
FILTER_ENABLED.set(true);
81+
assertFutureThrows(reindex(true), TestException.class);
82+
FILTER_ENABLED.set(false);
83+
84+
// Run into conflicts with partial destination index
85+
assertResponse(reindex(true), res -> {
86+
assertThat(res.getBulkFailures(), not(empty()));
87+
for (BulkItemResponse.Failure failure : res.getBulkFailures()) {
88+
assertThat(failure.getMessage(), containsString("VersionConflictEngineException: ["));
89+
}
90+
});
91+
92+
// Bypass conflicts and complete reindex
93+
assertResponse(reindex(false), res -> { assertThat(res.getBulkFailures(), empty()); });
94+
assertBusy(() -> { assertHitCount(prepareSearch(DEST_INDEX).setSize(0).setTrackTotalHits(true), NUM_DOCS); });
95+
}
96+
97+
private ActionFuture<BulkByScrollResponse> reindex(boolean abortOnVersionConflict) {
98+
ReindexRequestBuilder builder = new ReindexRequestBuilder(internalCluster().client());
99+
builder.source(INDEX).destination(DEST_INDEX).abortOnVersionConflict(abortOnVersionConflict);
100+
builder.source().setSize(1);
101+
builder.destination().setOpType(CREATE);
102+
return builder.execute();
103+
}
104+
105+
private static class TestException extends ElasticsearchException {
106+
TestException() {
107+
super("Injected index failure");
108+
}
109+
}
110+
111+
public static class TestPlugin extends Plugin implements ActionPlugin {
112+
@Override
113+
public List<ActionFilter> getActionFilters() {
114+
return singletonList(new ActionFilter() {
115+
@Override
116+
public int order() {
117+
return Integer.MIN_VALUE;
118+
}
119+
120+
@Override
121+
public <Request extends ActionRequest, Response extends ActionResponse> void apply(
122+
Task task,
123+
String action,
124+
Request request,
125+
ActionListener<Response> listener,
126+
ActionFilterChain<Request, Response> chain
127+
) {
128+
if (FILTER_ENABLED.get()
129+
&& action.equals("indices:data/write/bulk")
130+
&& DOC_COUNT.incrementAndGet() > NUM_PARTIAL_DOCS) {
131+
listener.onFailure(new TestException());
132+
} else {
133+
chain.proceed(task, action, request, listener);
134+
}
135+
}
136+
137+
});
138+
}
139+
}
140+
}

0 commit comments

Comments
 (0)