Skip to content

Commit de64d93

Browse files
authored
Fail the reindex data stream task if any document fails to reindex (#121591) (#121679)
1 parent bb7e477 commit de64d93

File tree

3 files changed

+177
-8
lines changed

3 files changed

+177
-8
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
2626
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
2727
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
28+
import org.elasticsearch.action.bulk.BulkItemResponse;
2829
import org.elasticsearch.action.search.SearchRequest;
2930
import org.elasticsearch.action.support.ActionFilters;
3031
import org.elasticsearch.action.support.HandledTransportAction;
@@ -44,6 +45,7 @@
4445
import org.elasticsearch.index.reindex.BulkByScrollResponse;
4546
import org.elasticsearch.index.reindex.ReindexAction;
4647
import org.elasticsearch.index.reindex.ReindexRequest;
48+
import org.elasticsearch.index.reindex.ScrollableHitSource;
4749
import org.elasticsearch.injection.guice.Inject;
4850
import org.elasticsearch.search.builder.SearchSourceBuilder;
4951
import org.elasticsearch.tasks.Task;
@@ -275,7 +277,34 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
275277
reindexRequest.setParentTask(parentTaskId);
276278
reindexRequest.setRequestsPerSecond(clusterService.getClusterSettings().get(REINDEX_MAX_REQUESTS_PER_SECOND_SETTING));
277279
reindexRequest.setSlices(0); // equivalent to slices=auto in rest api
278-
client.execute(ReindexAction.INSTANCE, reindexRequest, listener);
280+
// Since we delete the source index on success, we want to fail the whole job if there are _any_ documents that fail to reindex:
281+
ActionListener<BulkByScrollResponse> checkForFailuresListener = ActionListener.wrap(bulkByScrollResponse -> {
282+
if (bulkByScrollResponse.getSearchFailures().isEmpty() == false) {
283+
ScrollableHitSource.SearchFailure firstSearchFailure = bulkByScrollResponse.getSearchFailures().get(0);
284+
listener.onFailure(
285+
new ElasticsearchException(
286+
"Failure reading data from {} caused by {}",
287+
firstSearchFailure.getReason(),
288+
sourceIndexName,
289+
firstSearchFailure.getReason().getMessage()
290+
)
291+
);
292+
} else if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) {
293+
BulkItemResponse.Failure firstBulkFailure = bulkByScrollResponse.getBulkFailures().get(0);
294+
listener.onFailure(
295+
new ElasticsearchException(
296+
"Failure loading data from {} into {} caused by {}",
297+
firstBulkFailure.getCause(),
298+
sourceIndexName,
299+
destIndexName,
300+
firstBulkFailure.getCause().getMessage()
301+
)
302+
);
303+
} else {
304+
listener.onResponse(bulkByScrollResponse);
305+
}
306+
}, listener::onFailure);
307+
client.execute(ReindexAction.INSTANCE, reindexRequest, checkForFailuresListener);
279308
}
280309

281310
private void updateSettings(

x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.util.Collections;
3232

33+
import static org.mockito.ArgumentMatchers.any;
3334
import static org.mockito.ArgumentMatchers.eq;
3435
import static org.mockito.Mockito.doNothing;
3536
import static org.mockito.Mockito.when;
@@ -111,7 +112,7 @@ public void testReindexIncludesRateLimit() {
111112
)
112113
);
113114

114-
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), eq(listener));
115+
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any());
115116

116117
action.reindex(sourceIndex, destIndex, listener, taskId);
117118

@@ -136,7 +137,7 @@ public void testReindexIncludesInfiniteRateLimit() {
136137
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
137138
)
138139
);
139-
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), eq(listener));
140+
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any());
140141

141142
action.reindex(sourceIndex, destIndex, listener, taskId);
142143

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java

Lines changed: 144 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,19 @@ public void testDataStreamValidationDoesNotBreakUpgrade() throws Exception {
183183
}
184184

185185
public void testUpgradeDataStream() throws Exception {
186+
/*
187+
* This test tests upgrading a "normal" data stream (dataStreamName), and upgrading a data stream that was originally just an
188+
* ordinary index that was converted to a data stream (dataStreamFromNonDataStreamIndices).
189+
*/
186190
String dataStreamName = "reindex_test_data_stream";
191+
String dataStreamFromNonDataStreamIndices = "index_first_reindex_test_data_stream";
187192
int numRollovers = randomIntBetween(0, 5);
188193
if (CLUSTER_TYPE == ClusterType.OLD) {
189194
createAndRolloverDataStream(dataStreamName, numRollovers);
195+
createDataStreamFromNonDataStreamIndices(dataStreamFromNonDataStreamIndices);
190196
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
191-
upgradeDataStream(dataStreamName, numRollovers);
197+
upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0);
198+
upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 0, 1);
192199
}
193200
}
194201

@@ -261,7 +268,116 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
261268
}
262269
}
263270

264-
private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception {
271+
private void createDataStreamFromNonDataStreamIndices(String dataStreamFromNonDataStreamIndices) throws IOException {
272+
/*
273+
* This method creates an index, creates an alias to that index, and then converts the aliased index into a data stream. This is
274+
* similar to the path that many indices (including system indices) took in versions 7/8.
275+
*/
276+
// First, we create an ordinary index with no @timestamp mapping:
277+
final String templateWithNoTimestamp = """
278+
{
279+
"mappings":{
280+
"properties": {
281+
"message": {
282+
"type": "text"
283+
}
284+
}
285+
}
286+
}
287+
""";
288+
// Note that this is not a data stream template:
289+
final String indexTemplate = """
290+
{
291+
"index_patterns": ["$PATTERN"],
292+
"template": $TEMPLATE
293+
}""";
294+
var putIndexTemplateRequest = new Request("POST", "/_index_template/reindex_test_data_stream_index_template");
295+
putIndexTemplateRequest.setJsonEntity(
296+
indexTemplate.replace("$TEMPLATE", templateWithNoTimestamp).replace("$PATTERN", dataStreamFromNonDataStreamIndices + "-*")
297+
);
298+
assertOK(client().performRequest(putIndexTemplateRequest));
299+
String indexName = dataStreamFromNonDataStreamIndices + "-01";
300+
bulkLoadDataMissingTimestamp(indexName);
301+
/*
302+
* Next, we will change the index's mapping to include a @timestamp field since we are going to convert it to a data stream. But
303+
* first we have to flush the translog to disk because adding a @timestamp field will cause errors if it is done before the translog
304+
* is flushed:
305+
*/
306+
assertOK(client().performRequest(new Request("POST", indexName + "/_flush")));
307+
ensureHealth(indexName, (request -> {
308+
request.addParameter("wait_for_nodes", "3");
309+
request.addParameter("wait_for_status", "green");
310+
request.addParameter("timeout", "70s");
311+
request.addParameter("level", "shards");
312+
}));
313+
314+
// Updating the mapping to include @timestamp:
315+
Request updateIndexMappingRequest = new Request("PUT", indexName + "/_mapping");
316+
updateIndexMappingRequest.setJsonEntity("""
317+
{
318+
"properties": {
319+
"@timestamp" : {
320+
"type": "date"
321+
},
322+
"message": {
323+
"type": "text"
324+
}
325+
}
326+
}""");
327+
assertOK(client().performRequest(updateIndexMappingRequest));
328+
329+
// Creating an alias with the same name that the data stream will have:
330+
Request createAliasRequest = new Request("POST", "/_aliases");
331+
String aliasRequestBody = """
332+
{
333+
"actions": [
334+
{
335+
"add": {
336+
"index": "$index",
337+
"alias": "$alias"
338+
}
339+
}
340+
]
341+
}""";
342+
createAliasRequest.setJsonEntity(
343+
aliasRequestBody.replace("$index", indexName).replace("$alias", dataStreamFromNonDataStreamIndices)
344+
);
345+
assertOK(client().performRequest(createAliasRequest));
346+
347+
// This is now just an aliased index. We'll convert it into a data stream
348+
final String templateWithTimestamp = """
349+
{
350+
"mappings":{
351+
"properties": {
352+
"@timestamp" : {
353+
"type": "date"
354+
},
355+
"message": {
356+
"type": "text"
357+
}
358+
}
359+
}
360+
}
361+
""";
362+
final String dataStreamTemplate = """
363+
{
364+
"index_patterns": ["$PATTERN"],
365+
"template": $TEMPLATE,
366+
"data_stream": {
367+
}
368+
}""";
369+
var putDataStreamTemplateRequest = new Request("POST", "/_index_template/reindex_test_data_stream_data_stream_template");
370+
putDataStreamTemplateRequest.setJsonEntity(
371+
dataStreamTemplate.replace("$TEMPLATE", templateWithTimestamp).replace("$PATTERN", dataStreamFromNonDataStreamIndices)
372+
);
373+
assertOK(client().performRequest(putDataStreamTemplateRequest));
374+
Request migrateToDataStreamRequest = new Request("POST", "/_data_stream/_migrate/" + dataStreamFromNonDataStreamIndices);
375+
assertOK(client().performRequest(migrateToDataStreamRequest));
376+
}
377+
378+
@SuppressWarnings("unchecked")
379+
private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster, int expectedSuccessesCount, int expectedErrorCount)
380+
throws Exception {
265381
Set<String> indicesNeedingUpgrade = getDataStreamIndices(dataStreamName);
266382
final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2);
267383
for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) {
@@ -329,16 +445,19 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
329445
statusResponseMap.get("total_indices_requiring_upgrade"),
330446
equalTo(originalWriteIndex + numRolloversOnOldCluster)
331447
);
332-
assertThat(statusResponseString, statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1));
448+
assertThat(statusResponseString, statusResponseMap.get("successes"), equalTo(expectedSuccessesCount));
333449
// We expect all the original indices to have been deleted
334-
for (String oldIndex : indicesNeedingUpgrade) {
335-
assertThat(statusResponseString, indexExists(oldIndex), equalTo(false));
450+
if (expectedErrorCount == 0) {
451+
for (String oldIndex : indicesNeedingUpgrade) {
452+
assertThat(statusResponseString, indexExists(oldIndex), equalTo(false));
453+
}
336454
}
337455
assertThat(
338456
statusResponseString,
339457
getDataStreamIndices(dataStreamName).size(),
340458
equalTo(expectedTotalIndicesInDataStream)
341459
);
460+
assertThat(statusResponseString, ((List<Object>) statusResponseMap.get("errors")).size(), equalTo(expectedErrorCount));
342461
}
343462
}, 60, TimeUnit.SECONDS);
344463
Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
@@ -394,6 +513,26 @@ private static void bulkLoadData(String dataStreamName) throws IOException {
394513
assertOK(response);
395514
}
396515

516+
/*
517+
* This bulkloads data, where some documents have no @timestamp field and some do.
518+
*/
519+
private static void bulkLoadDataMissingTimestamp(String dataStreamName) throws IOException {
520+
final String bulk = """
521+
{"create": {}}
522+
{"metricset": "pod", "k8s": {"pod": {"name": "cat", "network": {"tx": 2001818691, "rx": 802133794}}}}
523+
{"create": {}}
524+
{"metricset": "pod", "k8s": {"pod": {"name": "hamster", "network": {"tx": 2005177954, "rx": 801479970}}}}
525+
{"create": {}}
526+
{"metricset": "pod", "k8s": {"pod": {"name": "cow", "network": {"tx": 2006223737, "rx": 802337279}}}}
527+
{"create": {}}
528+
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "rat", "network": {"tx": 2012916202, "rx": 803685721}}}}
529+
""";
530+
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
531+
bulkRequest.setJsonEntity(bulk.replace("$now", formatInstant(Instant.now())));
532+
var response = client().performRequest(bulkRequest);
533+
assertOK(response);
534+
}
535+
397536
static String formatInstant(Instant instant) {
398537
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
399538
}

0 commit comments

Comments
 (0)