Skip to content
Merged
5 changes: 5 additions & 0 deletions docs/changelog/118941.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118941
summary: Allow archive and searchable snapshots indices in N-2 version
area: Recovery
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import com.carrotsearch.randomizedtesting.annotations.TestCaseOrdering;

import org.elasticsearch.client.Request;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
Expand All @@ -28,12 +30,15 @@

import java.util.Comparator;
import java.util.Locale;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.elasticsearch.test.cluster.util.Version.CURRENT;
import static org.elasticsearch.test.cluster.util.Version.fromString;
import static org.elasticsearch.test.rest.ObjectPath.createFromResponse;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

/**
Expand Down Expand Up @@ -113,6 +118,12 @@ protected String suffix(String name) {
return name + '-' + getTestName().split(" ")[0].toLowerCase(Locale.ROOT);
}

protected Settings repositorySettings() {
return Settings.builder()
.put("location", REPOSITORY_PATH.getRoot().toPath().resolve(suffix("location")).toFile().getPath())
.build();
}

protected static Version clusterVersion() throws Exception {
var response = assertOK(client().performRequest(new Request("GET", "/")));
var responseBody = createFromResponse(response);
Expand All @@ -121,12 +132,56 @@ protected static Version clusterVersion() throws Exception {
return version;
}

protected static Version indexLuceneVersion(String indexName) throws Exception {
protected static Version indexVersion(String indexName) throws Exception {
var response = assertOK(client().performRequest(new Request("GET", "/" + indexName + "/_settings")));
int id = Integer.parseInt(createFromResponse(response).evaluate(indexName + ".settings.index.version.created"));
return new Version((byte) ((id / 1000000) % 100), (byte) ((id / 10000) % 100), (byte) ((id / 100) % 100));
}

protected static void indexDocs(String indexName, int numDocs) throws Exception {
var request = new Request("POST", "/_bulk");
var docs = new StringBuilder();
IntStream.range(0, numDocs).forEach(n -> docs.append(Strings.format("""
{"index":{"_id":"%s","_index":"%s"}}
{"test":"test"}
""", n, indexName)));
request.setJsonEntity(docs.toString());
var response = assertOK(client().performRequest(request));
assertThat(entityAsMap(response).get("errors"), allOf(notNullValue(), is(false)));
}

protected static void mountIndex(String repository, String snapshot, String indexName, boolean partial, String renamedIndexName)
throws Exception {
var request = new Request("POST", "/_snapshot/" + repository + "/" + snapshot + "/_mount");
request.addParameter("wait_for_completion", "true");
var storage = partial ? "shared_cache" : "full_copy";
request.addParameter("storage", storage);
request.setJsonEntity(Strings.format("""
{
"index": "%s",
"renamed_index": "%s"
}""", indexName, renamedIndexName));
var responseBody = createFromResponse(client().performRequest(request));
assertThat(responseBody.evaluate("snapshot.shards.total"), equalTo((int) responseBody.evaluate("snapshot.shards.successful")));
assertThat(responseBody.evaluate("snapshot.shards.failed"), equalTo(0));
}

protected static void restoreIndex(String repository, String snapshot, String indexName, String renamedIndexName) throws Exception {
var request = new Request("POST", "/_snapshot/" + repository + "/" + snapshot + "/_restore");
request.addParameter("wait_for_completion", "true");
request.setJsonEntity(org.elasticsearch.common.Strings.format("""
{
"indices": "%s",
"include_global_state": false,
"rename_pattern": "(.+)",
"rename_replacement": "%s",
"include_aliases": false
}""", indexName, renamedIndexName));
var responseBody = createFromResponse(client().performRequest(request));
assertThat(responseBody.evaluate("snapshot.shards.total"), equalTo((int) responseBody.evaluate("snapshot.shards.failed")));
assertThat(responseBody.evaluate("snapshot.shards.successful"), equalTo(0));
}

/**
* Execute the test suite with the parameters provided by the {@link #parameters()} in version order.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,18 @@
package org.elasticsearch.lucene;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.cluster.util.Version;

import java.util.stream.IntStream;

import static org.elasticsearch.test.rest.ObjectPath.createFromResponse;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class LuceneCompatibilityIT extends AbstractLuceneIndexCompatibilityTestCase {

Expand All @@ -35,22 +33,19 @@ public LuceneCompatibilityIT(Version version) {
super(version);
}

/**
* Creates an index and a snapshot on N-2, then restores the snapshot on N.
*/
public void testRestoreIndex() throws Exception {
final String repository = suffix("repository");
final String snapshot = suffix("snapshot");
final String index = suffix("index");
final int numDocs = 1234;

logger.debug("--> registering repository [{}]", repository);
registerRepository(
client(),
repository,
FsRepository.TYPE,
true,
Settings.builder().put("location", REPOSITORY_PATH.getRoot().getPath()).build()
);

if (VERSION_MINUS_2.equals(clusterVersion())) {
logger.debug("--> registering repository [{}]", repository);
registerRepository(client(), repository, FsRepository.TYPE, true, repositorySettings());

logger.debug("--> creating index [{}]", index);
createIndex(
client(),
Expand All @@ -63,17 +58,7 @@ public void testRestoreIndex() throws Exception {
);

logger.debug("--> indexing [{}] docs in [{}]", numDocs, index);
final var bulks = new StringBuilder();
IntStream.range(0, numDocs).forEach(n -> bulks.append(Strings.format("""
{"index":{"_id":"%s","_index":"%s"}}
{"test":"test"}
""", n, index)));

var bulkRequest = new Request("POST", "/_bulk");
bulkRequest.setJsonEntity(bulks.toString());
var bulkResponse = client().performRequest(bulkRequest);
assertOK(bulkResponse);
assertThat(entityAsMap(bulkResponse).get("errors"), allOf(notNullValue(), is(false)));
indexDocs(index, numDocs);

logger.debug("--> creating snapshot [{}]", snapshot);
createSnapshot(client(), repository, snapshot, true);
Expand All @@ -83,7 +68,7 @@ public void testRestoreIndex() throws Exception {
if (VERSION_MINUS_1.equals(clusterVersion())) {
ensureGreen(index);

assertThat(indexLuceneVersion(index), equalTo(VERSION_MINUS_2));
assertThat(indexVersion(index), equalTo(VERSION_MINUS_2));
assertDocCount(client(), index, numDocs);

logger.debug("--> deleting index [{}]", index);
Expand All @@ -93,9 +78,9 @@ public void testRestoreIndex() throws Exception {

if (VERSION_CURRENT.equals(clusterVersion())) {
var restoredIndex = suffix("index-restored");
logger.debug("--> restoring index [{}] as archive [{}]", index, restoredIndex);
logger.debug("--> restoring index [{}] as [{}]", index, restoredIndex);

// Restoring the archive will fail as Elasticsearch does not support reading N-2 yet
// Restoring the index will fail as Elasticsearch does not support reading N-2 yet
var request = new Request("POST", "/_snapshot/" + repository + "/" + snapshot + "/_restore");
request.addParameter("wait_for_completion", "true");
request.setJsonEntity(Strings.format("""
Expand All @@ -106,9 +91,20 @@ public void testRestoreIndex() throws Exception {
"rename_replacement": "%s",
"include_aliases": false
}""", index, restoredIndex));
var responseBody = createFromResponse(client().performRequest(request));
assertThat(responseBody.evaluate("snapshot.shards.total"), equalTo((int) responseBody.evaluate("snapshot.shards.failed")));
assertThat(responseBody.evaluate("snapshot.shards.successful"), equalTo(0));

var responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertEquals(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), responseException.getResponse().getStatusLine().getStatusCode());
assertThat(
responseException.getMessage(),
allOf(
containsString("cannot restore index [[" + index),
containsString("because it cannot be upgraded"),
containsString("has current compatibility version [" + VERSION_MINUS_2 + '-' + VERSION_MINUS_1.getMajor() + ".0.0]"),
containsString("but the minimum compatible version is [" + VERSION_MINUS_1.getMajor() + ".0.0]."),
containsString("It should be re-indexed in Elasticsearch " + VERSION_MINUS_1.getMajor() + ".x"),
containsString("before upgrading to " + VERSION_CURRENT)
)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,13 @@

package org.elasticsearch.lucene;

import org.elasticsearch.client.Request;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.test.cluster.util.Version;

import java.util.stream.IntStream;

import static org.elasticsearch.test.rest.ObjectPath.createFromResponse;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class SearchableSnapshotCompatibilityIT extends AbstractLuceneIndexCompatibilityTestCase {

Expand All @@ -37,24 +29,19 @@ public SearchableSnapshotCompatibilityIT(Version version) {
super(version);
}

// TODO Add a test to mount the N-2 index on N-1 and then search it on N

/**
* Creates an index and a snapshot on N-2, then mounts the snapshot on N.
*/
public void testSearchableSnapshot() throws Exception {
final String repository = suffix("repository");
final String snapshot = suffix("snapshot");
final String index = suffix("index");
final int numDocs = 1234;

logger.debug("--> registering repository [{}]", repository);
registerRepository(
client(),
repository,
FsRepository.TYPE,
true,
Settings.builder().put("location", REPOSITORY_PATH.getRoot().getPath()).build()
);

if (VERSION_MINUS_2.equals(clusterVersion())) {
logger.debug("--> registering repository [{}]", repository);
registerRepository(client(), repository, FsRepository.TYPE, true, repositorySettings());

logger.debug("--> creating index [{}]", index);
createIndex(
client(),
Expand All @@ -67,17 +54,7 @@ public void testSearchableSnapshot() throws Exception {
);

logger.debug("--> indexing [{}] docs in [{}]", numDocs, index);
final var bulks = new StringBuilder();
IntStream.range(0, numDocs).forEach(n -> bulks.append(Strings.format("""
{"index":{"_id":"%s","_index":"%s"}}
{"test":"test"}
""", n, index)));

var bulkRequest = new Request("POST", "/_bulk");
bulkRequest.setJsonEntity(bulks.toString());
var bulkResponse = client().performRequest(bulkRequest);
assertOK(bulkResponse);
assertThat(entityAsMap(bulkResponse).get("errors"), allOf(notNullValue(), is(false)));
indexDocs(index, numDocs);

logger.debug("--> creating snapshot [{}]", snapshot);
createSnapshot(client(), repository, snapshot, true);
Expand All @@ -87,7 +64,7 @@ public void testSearchableSnapshot() throws Exception {
if (VERSION_MINUS_1.equals(clusterVersion())) {
ensureGreen(index);

assertThat(indexLuceneVersion(index), equalTo(VERSION_MINUS_2));
assertThat(indexVersion(index), equalTo(VERSION_MINUS_2));
assertDocCount(client(), index, numDocs);

logger.debug("--> deleting index [{}]", index);
Expand All @@ -98,20 +75,75 @@ public void testSearchableSnapshot() throws Exception {
if (VERSION_CURRENT.equals(clusterVersion())) {
var mountedIndex = suffix("index-mounted");
logger.debug("--> mounting index [{}] as [{}]", index, mountedIndex);
mountIndex(repository, snapshot, index, randomBoolean(), mountedIndex);

ensureGreen(mountedIndex);

assertThat(indexVersion(mountedIndex), equalTo(VERSION_MINUS_2));
assertDocCount(client(), mountedIndex, numDocs);

logger.debug("--> adding replica to test peer-recovery");
updateIndexSettings(mountedIndex, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1));
ensureGreen(mountedIndex);
}
}

/**
* Creates an index and a snapshot on N-2, mounts the snapshot on N -1 and then upgrades to N.
*/
public void testSearchableSnapshotUpgrade() throws Exception {
final String mountedIndex = suffix("index-mounted");
final String repository = suffix("repository");
final String snapshot = suffix("snapshot");
final String index = suffix("index");
final int numDocs = 4321;

if (VERSION_MINUS_2.equals(clusterVersion())) {
logger.debug("--> registering repository [{}]", repository);
registerRepository(client(), repository, FsRepository.TYPE, true, repositorySettings());

logger.debug("--> creating index [{}]", index);
createIndex(
client(),
index,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build()
);

logger.debug("--> indexing [{}] docs in [{}]", numDocs, index);
indexDocs(index, numDocs);

logger.debug("--> creating snapshot [{}]", snapshot);
createSnapshot(client(), repository, snapshot, true);

logger.debug("--> deleting index [{}]", index);
deleteIndex(index);
return;
}

if (VERSION_MINUS_1.equals(clusterVersion())) {
logger.debug("--> mounting index [{}] as [{}]", index, mountedIndex);
mountIndex(repository, snapshot, index, randomBoolean(), mountedIndex);

ensureGreen(mountedIndex);

assertThat(indexVersion(mountedIndex), equalTo(VERSION_MINUS_2));
assertDocCount(client(), mountedIndex, numDocs);
return;
}

if (VERSION_CURRENT.equals(clusterVersion())) {
ensureGreen(mountedIndex);

assertThat(indexVersion(mountedIndex), equalTo(VERSION_MINUS_2));
assertDocCount(client(), mountedIndex, numDocs);

// Mounting the index will fail as Elasticsearch does not support reading N-2 yet
var request = new Request("POST", "/_snapshot/" + repository + "/" + snapshot + "/_mount");
request.addParameter("wait_for_completion", "true");
var storage = randomBoolean() ? "shared_cache" : "full_copy";
request.addParameter("storage", storage);
request.setJsonEntity(Strings.format("""
{
"index": "%s",
"renamed_index": "%s"
}""", index, mountedIndex));
var responseBody = createFromResponse(client().performRequest(request));
assertThat(responseBody.evaluate("snapshot.shards.total"), equalTo((int) responseBody.evaluate("snapshot.shards.failed")));
assertThat(responseBody.evaluate("snapshot.shards.successful"), equalTo(0));
logger.debug("--> adding replica to test peer-recovery");
updateIndexSettings(mountedIndex, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1));
ensureGreen(mountedIndex);
}
}
}
Loading