Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
Expand All @@ -44,6 +45,7 @@
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -275,7 +277,35 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
reindexRequest.setParentTask(parentTaskId);
reindexRequest.setRequestsPerSecond(clusterService.getClusterSettings().get(REINDEX_MAX_REQUESTS_PER_SECOND_SETTING));
reindexRequest.setSlices(0); // equivalent to slices=auto in rest api
client.execute(ReindexAction.INSTANCE, reindexRequest, listener);
// Since we delete the source index on success, we want to fail the whole job if there are _any_ documents that fail to reindex:
ActionListener<BulkByScrollResponse> checkForFailuresListener = ActionListener.wrap(bulkByScrollResponse -> {
if (bulkByScrollResponse.getSearchFailures().isEmpty() == false) {
ScrollableHitSource.SearchFailure firstSearchFailure = bulkByScrollResponse.getSearchFailures().get(0);
listener.onFailure(
new ElasticsearchException(
"Failure reading data from {} caused by {}",
firstSearchFailure.getReason(),
sourceIndexName,
firstSearchFailure.getReason().getMessage()
)
);
}
if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) {
BulkItemResponse.Failure firstBulkFailure = bulkByScrollResponse.getBulkFailures().get(0);
listener.onFailure(
new ElasticsearchException(
"Failure loading data from {} into {} caused by {}",
firstBulkFailure.getCause(),
sourceIndexName,
destIndexName,
firstBulkFailure.getCause().getMessage()
)
);
} else {
listener.onResponse(bulkByScrollResponse);
}
}, listener::onFailure);
client.execute(ReindexAction.INSTANCE, reindexRequest, checkForFailuresListener);
}

private void updateSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,19 @@ public void testDataStreamValidationDoesNotBreakUpgrade() throws Exception {
}

public void testUpgradeDataStream() throws Exception {
/*
* This test tests upgrading a "normal" data stream (dataStreamName), and upgrading a data stream that was originally just an
* ordinary index that was converted to a data stream (dataStreamFromNonDataStreamIndices).
*/
String dataStreamName = "reindex_test_data_stream";
String dataStreamFromNonDataStreamIndices = "index_first_reindex_test_data_stream";
int numRollovers = randomIntBetween(0, 5);
if (CLUSTER_TYPE == ClusterType.OLD) {
createAndRolloverDataStream(dataStreamName, numRollovers);
createDataStreamFromNonDataStreamIndices(dataStreamFromNonDataStreamIndices);
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
upgradeDataStream(dataStreamName, numRollovers);
upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0);
upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 0, 1);
}
}

Expand Down Expand Up @@ -266,7 +273,116 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
}
}

private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception {
private void createDataStreamFromNonDataStreamIndices(String dataStreamFromNonDataStreamIndices) throws IOException {
/*
* This method creates an index, creates an alias to that index, and then converts the aliased index into a data stream. This is
* similar to the path that many indices (including system indices) took in versions 7/8.
*/
// First, we create an ordinary index with no @timestamp mapping:
final String templateWithNoTimestamp = """
{
"mappings":{
"properties": {
"message": {
"type": "text"
}
}
}
}
""";
// Note that this is not a data stream template:
final String indexTemplate = """
{
"index_patterns": ["$PATTERN"],
"template": $TEMPLATE
}""";
var putIndexTemplateRequest = new Request("POST", "/_index_template/reindex_test_data_stream_index_template");
putIndexTemplateRequest.setJsonEntity(
indexTemplate.replace("$TEMPLATE", templateWithNoTimestamp).replace("$PATTERN", dataStreamFromNonDataStreamIndices + "-*")
);
assertOK(client().performRequest(putIndexTemplateRequest));
String indexName = dataStreamFromNonDataStreamIndices + "-01";
bulkLoadDataMissingTimestamp(indexName);
/*
* Next, we will change the index's mapping to include a @timestamp field since we are going to convert it to a data stream. But
* first we have to flush the translog to disk because adding a @timestamp field will cause errors if it is done before the translog
* is flushed:
*/
assertOK(client().performRequest(new Request("POST", indexName + "/_flush")));
ensureHealth(indexName, (request -> {
request.addParameter("wait_for_nodes", "3");
request.addParameter("wait_for_status", "green");
request.addParameter("timeout", "70s");
request.addParameter("level", "shards");
}));

// Updating the mapping to include @timestamp:
Request updateIndexMappingRequest = new Request("PUT", indexName + "/_mapping");
updateIndexMappingRequest.setJsonEntity("""
{
"properties": {
"@timestamp" : {
"type": "date"
},
"message": {
"type": "text"
}
}
}""");
assertOK(client().performRequest(updateIndexMappingRequest));

// Creating an alias with the same name that the data stream will have:
Request createAliasRequest = new Request("POST", "/_aliases");
String aliasRequestBody = """
{
"actions": [
{
"add": {
"index": "$index",
"alias": "$alias"
}
}
]
}""";
createAliasRequest.setJsonEntity(
aliasRequestBody.replace("$index", indexName).replace("$alias", dataStreamFromNonDataStreamIndices)
);
assertOK(client().performRequest(createAliasRequest));

// This is now just an aliased index. We'll convert it into a data stream
final String templateWithTimestamp = """
{
"mappings":{
"properties": {
"@timestamp" : {
"type": "date"
},
"message": {
"type": "text"
}
}
}
}
""";
final String dataStreamTemplate = """
{
"index_patterns": ["$PATTERN"],
"template": $TEMPLATE,
"data_stream": {
}
}""";
var putDataStreamTemplateRequest = new Request("POST", "/_index_template/reindex_test_data_stream_data_stream_template");
putDataStreamTemplateRequest.setJsonEntity(
dataStreamTemplate.replace("$TEMPLATE", templateWithTimestamp).replace("$PATTERN", dataStreamFromNonDataStreamIndices)
);
assertOK(client().performRequest(putDataStreamTemplateRequest));
Request migrateToDataStreamRequest = new Request("POST", "/_data_stream/_migrate/" + dataStreamFromNonDataStreamIndices);
assertOK(client().performRequest(migrateToDataStreamRequest));
}

@SuppressWarnings("unchecked")
private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster, int expectedSuccessesCount, int expectedErrorCount)
throws Exception {
Set<String> indicesNeedingUpgrade = getDataStreamIndices(dataStreamName);
final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2);
for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) {
Expand Down Expand Up @@ -334,16 +450,19 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
statusResponseMap.get("total_indices_requiring_upgrade"),
equalTo(originalWriteIndex + numRolloversOnOldCluster)
);
assertThat(statusResponseString, statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1));
assertThat(statusResponseString, statusResponseMap.get("successes"), equalTo(expectedSuccessesCount));
// We expect all the original indices to have been deleted
for (String oldIndex : indicesNeedingUpgrade) {
assertThat(statusResponseString, indexExists(oldIndex), equalTo(false));
if (expectedErrorCount == 0) {
for (String oldIndex : indicesNeedingUpgrade) {
assertThat(statusResponseString, indexExists(oldIndex), equalTo(false));
}
}
assertThat(
statusResponseString,
getDataStreamIndices(dataStreamName).size(),
equalTo(expectedTotalIndicesInDataStream)
);
assertThat(statusResponseString, ((List<Object>) statusResponseMap.get("errors")).size(), equalTo(expectedErrorCount));
}
}, 60, TimeUnit.SECONDS);
Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
Expand Down Expand Up @@ -399,6 +518,26 @@ private static void bulkLoadData(String dataStreamName) throws IOException {
assertOK(response);
}

/*
* This bulkloads data, where some documents have no @timestamp field and some do.
*/
private static void bulkLoadDataMissingTimestamp(String dataStreamName) throws IOException {
final String bulk = """
{"create": {}}
{"metricset": "pod", "k8s": {"pod": {"name": "cat", "network": {"tx": 2001818691, "rx": 802133794}}}}
{"create": {}}
{"metricset": "pod", "k8s": {"pod": {"name": "hamster", "network": {"tx": 2005177954, "rx": 801479970}}}}
{"create": {}}
{"metricset": "pod", "k8s": {"pod": {"name": "cow", "network": {"tx": 2006223737, "rx": 802337279}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "rat", "network": {"tx": 2012916202, "rx": 803685721}}}}
""";
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
bulkRequest.setJsonEntity(bulk.replace("$now", formatInstant(Instant.now())));
var response = client().performRequest(bulkRequest);
assertOK(response);
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}
Expand Down