Skip to content

Commit a84c466

Browse files
authored
Fail the reindex data stream task if any document fails to reindex (#121591) (#121677)
1 parent 4d83a17 commit a84c466

File tree

3 files changed

+177
-8
lines changed

3 files changed

+177
-8
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
2626
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
2727
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
28+
import org.elasticsearch.action.bulk.BulkItemResponse;
2829
import org.elasticsearch.action.search.SearchRequest;
2930
import org.elasticsearch.action.support.ActionFilters;
3031
import org.elasticsearch.action.support.HandledTransportAction;
@@ -44,6 +45,7 @@
4445
import org.elasticsearch.index.reindex.BulkByScrollResponse;
4546
import org.elasticsearch.index.reindex.ReindexAction;
4647
import org.elasticsearch.index.reindex.ReindexRequest;
48+
import org.elasticsearch.index.reindex.ScrollableHitSource;
4749
import org.elasticsearch.injection.guice.Inject;
4850
import org.elasticsearch.search.builder.SearchSourceBuilder;
4951
import org.elasticsearch.tasks.Task;
@@ -275,7 +277,34 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
275277
reindexRequest.setParentTask(parentTaskId);
276278
reindexRequest.setRequestsPerSecond(clusterService.getClusterSettings().get(REINDEX_MAX_REQUESTS_PER_SECOND_SETTING));
277279
reindexRequest.setSlices(0); // equivalent to slices=auto in rest api
278-
client.execute(ReindexAction.INSTANCE, reindexRequest, listener);
280+
// Since we delete the source index on success, we want to fail the whole job if there are _any_ documents that fail to reindex:
281+
ActionListener<BulkByScrollResponse> checkForFailuresListener = ActionListener.wrap(bulkByScrollResponse -> {
282+
if (bulkByScrollResponse.getSearchFailures().isEmpty() == false) {
283+
ScrollableHitSource.SearchFailure firstSearchFailure = bulkByScrollResponse.getSearchFailures().get(0);
284+
listener.onFailure(
285+
new ElasticsearchException(
286+
"Failure reading data from {} caused by {}",
287+
firstSearchFailure.getReason(),
288+
sourceIndexName,
289+
firstSearchFailure.getReason().getMessage()
290+
)
291+
);
292+
} else if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) {
293+
BulkItemResponse.Failure firstBulkFailure = bulkByScrollResponse.getBulkFailures().get(0);
294+
listener.onFailure(
295+
new ElasticsearchException(
296+
"Failure loading data from {} into {} caused by {}",
297+
firstBulkFailure.getCause(),
298+
sourceIndexName,
299+
destIndexName,
300+
firstBulkFailure.getCause().getMessage()
301+
)
302+
);
303+
} else {
304+
listener.onResponse(bulkByScrollResponse);
305+
}
306+
}, listener::onFailure);
307+
client.execute(ReindexAction.INSTANCE, reindexRequest, checkForFailuresListener);
279308
}
280309

281310
private void updateSettings(

x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.util.Collections;
3232

33+
import static org.mockito.ArgumentMatchers.any;
3334
import static org.mockito.ArgumentMatchers.eq;
3435
import static org.mockito.Mockito.doNothing;
3536
import static org.mockito.Mockito.when;
@@ -111,7 +112,7 @@ public void testReindexIncludesRateLimit() {
111112
)
112113
);
113114

114-
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), eq(listener));
115+
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any());
115116

116117
action.reindex(sourceIndex, destIndex, listener, taskId);
117118

@@ -136,7 +137,7 @@ public void testReindexIncludesInfiniteRateLimit() {
136137
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
137138
)
138139
);
139-
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), eq(listener));
140+
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any());
140141

141142
action.reindex(sourceIndex, destIndex, listener, taskId);
142143

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java

Lines changed: 144 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,19 @@ public void testDataStreamValidationDoesNotBreakUpgrade() throws Exception {
183183
}
184184

185185
public void testUpgradeDataStream() throws Exception {
186+
/*
187+
* This test tests upgrading a "normal" data stream (dataStreamName), and upgrading a data stream that was originally just an
188+
* ordinary index that was converted to a data stream (dataStreamFromNonDataStreamIndices).
189+
*/
186190
String dataStreamName = "reindex_test_data_stream";
191+
String dataStreamFromNonDataStreamIndices = "index_first_reindex_test_data_stream";
187192
int numRollovers = randomIntBetween(0, 5);
188193
if (CLUSTER_TYPE == ClusterType.OLD) {
189194
createAndRolloverDataStream(dataStreamName, numRollovers);
195+
createDataStreamFromNonDataStreamIndices(dataStreamFromNonDataStreamIndices);
190196
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
191-
upgradeDataStream(dataStreamName, numRollovers);
197+
upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0);
198+
upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 0, 1);
192199
}
193200
}
194201

@@ -266,7 +273,116 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
266273
}
267274
}
268275

269-
private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception {
276+
private void createDataStreamFromNonDataStreamIndices(String dataStreamFromNonDataStreamIndices) throws IOException {
277+
/*
278+
* This method creates an index, creates an alias to that index, and then converts the aliased index into a data stream. This is
279+
* similar to the path that many indices (including system indices) took in versions 7/8.
280+
*/
281+
// First, we create an ordinary index with no @timestamp mapping:
282+
final String templateWithNoTimestamp = """
283+
{
284+
"mappings":{
285+
"properties": {
286+
"message": {
287+
"type": "text"
288+
}
289+
}
290+
}
291+
}
292+
""";
293+
// Note that this is not a data stream template:
294+
final String indexTemplate = """
295+
{
296+
"index_patterns": ["$PATTERN"],
297+
"template": $TEMPLATE
298+
}""";
299+
var putIndexTemplateRequest = new Request("POST", "/_index_template/reindex_test_data_stream_index_template");
300+
putIndexTemplateRequest.setJsonEntity(
301+
indexTemplate.replace("$TEMPLATE", templateWithNoTimestamp).replace("$PATTERN", dataStreamFromNonDataStreamIndices + "-*")
302+
);
303+
assertOK(client().performRequest(putIndexTemplateRequest));
304+
String indexName = dataStreamFromNonDataStreamIndices + "-01";
305+
bulkLoadDataMissingTimestamp(indexName);
306+
/*
307+
* Next, we will change the index's mapping to include a @timestamp field since we are going to convert it to a data stream. But
308+
* first we have to flush the translog to disk because adding a @timestamp field will cause errors if it is done before the translog
309+
* is flushed:
310+
*/
311+
assertOK(client().performRequest(new Request("POST", indexName + "/_flush")));
312+
ensureHealth(indexName, (request -> {
313+
request.addParameter("wait_for_nodes", "3");
314+
request.addParameter("wait_for_status", "green");
315+
request.addParameter("timeout", "70s");
316+
request.addParameter("level", "shards");
317+
}));
318+
319+
// Updating the mapping to include @timestamp:
320+
Request updateIndexMappingRequest = new Request("PUT", indexName + "/_mapping");
321+
updateIndexMappingRequest.setJsonEntity("""
322+
{
323+
"properties": {
324+
"@timestamp" : {
325+
"type": "date"
326+
},
327+
"message": {
328+
"type": "text"
329+
}
330+
}
331+
}""");
332+
assertOK(client().performRequest(updateIndexMappingRequest));
333+
334+
// Creating an alias with the same name that the data stream will have:
335+
Request createAliasRequest = new Request("POST", "/_aliases");
336+
String aliasRequestBody = """
337+
{
338+
"actions": [
339+
{
340+
"add": {
341+
"index": "$index",
342+
"alias": "$alias"
343+
}
344+
}
345+
]
346+
}""";
347+
createAliasRequest.setJsonEntity(
348+
aliasRequestBody.replace("$index", indexName).replace("$alias", dataStreamFromNonDataStreamIndices)
349+
);
350+
assertOK(client().performRequest(createAliasRequest));
351+
352+
// This is now just an aliased index. We'll convert it into a data stream
353+
final String templateWithTimestamp = """
354+
{
355+
"mappings":{
356+
"properties": {
357+
"@timestamp" : {
358+
"type": "date"
359+
},
360+
"message": {
361+
"type": "text"
362+
}
363+
}
364+
}
365+
}
366+
""";
367+
final String dataStreamTemplate = """
368+
{
369+
"index_patterns": ["$PATTERN"],
370+
"template": $TEMPLATE,
371+
"data_stream": {
372+
}
373+
}""";
374+
var putDataStreamTemplateRequest = new Request("POST", "/_index_template/reindex_test_data_stream_data_stream_template");
375+
putDataStreamTemplateRequest.setJsonEntity(
376+
dataStreamTemplate.replace("$TEMPLATE", templateWithTimestamp).replace("$PATTERN", dataStreamFromNonDataStreamIndices)
377+
);
378+
assertOK(client().performRequest(putDataStreamTemplateRequest));
379+
Request migrateToDataStreamRequest = new Request("POST", "/_data_stream/_migrate/" + dataStreamFromNonDataStreamIndices);
380+
assertOK(client().performRequest(migrateToDataStreamRequest));
381+
}
382+
383+
@SuppressWarnings("unchecked")
384+
private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster, int expectedSuccessesCount, int expectedErrorCount)
385+
throws Exception {
270386
Set<String> indicesNeedingUpgrade = getDataStreamIndices(dataStreamName);
271387
final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2);
272388
for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) {
@@ -334,16 +450,19 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
334450
statusResponseMap.get("total_indices_requiring_upgrade"),
335451
equalTo(originalWriteIndex + numRolloversOnOldCluster)
336452
);
337-
assertThat(statusResponseString, statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1));
453+
assertThat(statusResponseString, statusResponseMap.get("successes"), equalTo(expectedSuccessesCount));
338454
// We expect all the original indices to have been deleted
339-
for (String oldIndex : indicesNeedingUpgrade) {
340-
assertThat(statusResponseString, indexExists(oldIndex), equalTo(false));
455+
if (expectedErrorCount == 0) {
456+
for (String oldIndex : indicesNeedingUpgrade) {
457+
assertThat(statusResponseString, indexExists(oldIndex), equalTo(false));
458+
}
341459
}
342460
assertThat(
343461
statusResponseString,
344462
getDataStreamIndices(dataStreamName).size(),
345463
equalTo(expectedTotalIndicesInDataStream)
346464
);
465+
assertThat(statusResponseString, ((List<Object>) statusResponseMap.get("errors")).size(), equalTo(expectedErrorCount));
347466
}
348467
}, 60, TimeUnit.SECONDS);
349468
Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
@@ -399,6 +518,26 @@ private static void bulkLoadData(String dataStreamName) throws IOException {
399518
assertOK(response);
400519
}
401520

521+
/*
522+
* This bulkloads data, where some documents have no @timestamp field and some do.
523+
*/
524+
private static void bulkLoadDataMissingTimestamp(String dataStreamName) throws IOException {
525+
final String bulk = """
526+
{"create": {}}
527+
{"metricset": "pod", "k8s": {"pod": {"name": "cat", "network": {"tx": 2001818691, "rx": 802133794}}}}
528+
{"create": {}}
529+
{"metricset": "pod", "k8s": {"pod": {"name": "hamster", "network": {"tx": 2005177954, "rx": 801479970}}}}
530+
{"create": {}}
531+
{"metricset": "pod", "k8s": {"pod": {"name": "cow", "network": {"tx": 2006223737, "rx": 802337279}}}}
532+
{"create": {}}
533+
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "rat", "network": {"tx": 2012916202, "rx": 803685721}}}}
534+
""";
535+
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
536+
bulkRequest.setJsonEntity(bulk.replace("$now", formatInstant(Instant.now())));
537+
var response = client().performRequest(bulkRequest);
538+
assertOK(response);
539+
}
540+
402541
static String formatInstant(Instant instant) {
403542
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
404543
}

0 commit comments

Comments
 (0)