Skip to content

Commit 0c8daae

Browse files
authored
Make SnapshotsInProgress project compatible (#125470)
This PR adds project-id to both SnapshotsInProgress and Snapshot so that they are aware of projects and ready to handle snapshots from multiple projects. Relates: ES-10224
1 parent b882e76 commit 0c8daae

File tree

11 files changed

+412
-51
lines changed

11 files changed

+412
-51
lines changed
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.upgrades;
11+
12+
import com.carrotsearch.randomizedtesting.annotations.Name;
13+
14+
import org.elasticsearch.client.Request;
15+
import org.elasticsearch.common.Strings;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.test.rest.ObjectPath;
18+
19+
import java.io.IOException;
20+
import java.util.Collection;
21+
22+
import static org.elasticsearch.upgrades.SnapshotBasedRecoveryIT.indexDocs;
23+
import static org.hamcrest.Matchers.empty;
24+
import static org.hamcrest.Matchers.equalTo;
25+
import static org.hamcrest.Matchers.not;
26+
27+
public class RunningSnapshotIT extends AbstractRollingUpgradeTestCase {
28+
29+
public RunningSnapshotIT(@Name("upgradedNodes") int upgradedNodes) {
30+
super(upgradedNodes);
31+
}
32+
33+
public void testRunningSnapshotCompleteAfterUpgrade() throws Exception {
34+
final String indexName = "index";
35+
final String repositoryName = "repo";
36+
final String snapshotName = "snapshot";
37+
final var nodeIds = getNodesInfo(client()).keySet();
38+
39+
if (isOldCluster()) {
40+
registerRepository(repositoryName, "fs", randomBoolean(), Settings.builder().put("location", "backup").build());
41+
// create an index to have one shard per node
42+
createIndex(indexName, indexSettings(3, 0).put("index.routing.allocation.total_shards_per_node", 1).build());
43+
ensureGreen(indexName);
44+
if (randomBoolean()) {
45+
indexDocs(indexName, between(10, 50));
46+
}
47+
flush(indexName, true);
48+
// Signal shutdown to prevent snapshot from being completed
49+
putShutdownMetadata(nodeIds);
50+
createSnapshot(repositoryName, snapshotName, false);
51+
assertRunningSnapshot(repositoryName, snapshotName);
52+
} else {
53+
if (isUpgradedCluster()) {
54+
deleteShutdownMetadata(nodeIds);
55+
assertNoShutdownMetadata(nodeIds);
56+
ensureGreen(indexName);
57+
assertBusy(() -> assertCompletedSnapshot(repositoryName, snapshotName));
58+
} else {
59+
assertRunningSnapshot(repositoryName, snapshotName);
60+
}
61+
}
62+
}
63+
64+
private void putShutdownMetadata(Collection<String> nodeIds) throws IOException {
65+
for (String nodeId : nodeIds) {
66+
final Request putShutdownRequest = new Request("PUT", "/_nodes/" + nodeId + "/shutdown");
67+
putShutdownRequest.setJsonEntity("""
68+
{
69+
"type": "remove",
70+
"reason": "test"
71+
}""");
72+
client().performRequest(putShutdownRequest);
73+
}
74+
}
75+
76+
private void deleteShutdownMetadata(Collection<String> nodeIds) throws IOException {
77+
for (String nodeId : nodeIds) {
78+
final Request request = new Request("DELETE", "/_nodes/" + nodeId + "/shutdown");
79+
client().performRequest(request);
80+
}
81+
}
82+
83+
private void assertNoShutdownMetadata(Collection<String> nodeIds) throws IOException {
84+
final ObjectPath responsePath = assertOKAndCreateObjectPath(
85+
client().performRequest(new Request("GET", "/_nodes/" + Strings.collectionToCommaDelimitedString(nodeIds) + "/shutdown"))
86+
);
87+
assertThat(responsePath.evaluate("nodes"), empty());
88+
}
89+
90+
private void assertRunningSnapshot(String repositoryName, String snapshotName) throws IOException {
91+
final Request request = new Request("GET", "/_snapshot/" + repositoryName + "/_current");
92+
final ObjectPath responsePath = assertOKAndCreateObjectPath(client().performRequest(request));
93+
assertThat(responsePath.evaluate("total"), equalTo(1));
94+
assertThat(responsePath.evaluate("snapshots.0.snapshot"), equalTo(snapshotName));
95+
}
96+
97+
private void assertCompletedSnapshot(String repositoryName, String snapshotName) throws IOException {
98+
{
99+
final Request request = new Request("GET", "/_snapshot/" + repositoryName + "/_current");
100+
final ObjectPath responsePath = assertOKAndCreateObjectPath(client().performRequest(request));
101+
assertThat(responsePath.evaluate("total"), equalTo(0));
102+
}
103+
104+
{
105+
final Request request = new Request("GET", "/_snapshot/" + repositoryName + "/" + snapshotName);
106+
final ObjectPath responsePath = assertOKAndCreateObjectPath(client().performRequest(request));
107+
assertThat(responsePath.evaluate("total"), equalTo(1));
108+
assertThat(responsePath.evaluate("snapshots.0.snapshot"), equalTo(snapshotName));
109+
assertThat(responsePath.evaluate("snapshots.0.state"), not(equalTo("IN_PROGRESS")));
110+
}
111+
}
112+
}

qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/SnapshotBasedRecoveryIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ private static Map<String, Object> search(String index, QueryBuilder query) thro
233233
return responseAsMap;
234234
}
235235

236-
private void indexDocs(String indexName, int numDocs) throws IOException {
236+
static void indexDocs(String indexName, int numDocs) throws IOException {
237237
final StringBuilder bulkBody = new StringBuilder();
238238
for (int i = 0; i < numDocs; i++) {
239239
bulkBody.append("{\"index\":{\"_id\":\"").append(i).append("\"}}\n");

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ static TransportVersion def(int id) {
205205
public static final TransportVersion RERANK_COMMON_OPTIONS_ADDED = def(9_037_0_00);
206206
public static final TransportVersion ESQL_REPORT_ORIGINAL_TYPES = def(9_038_00_0);
207207
public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO = def(9_039_0_00);
208+
public static final TransportVersion PROJECT_ID_IN_SNAPSHOT = def(9_040_0_00);
208209

209210
/*
210211
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,29 @@ public static <K, T, M extends Map<K, T>> boolean hasKey(MapDiff<K, T, M> diff,
146146
return false;
147147
}
148148

149+
/**
150+
* Create a new JDK map backed MapDiff by transforming the keys with the provided keyFunction.
151+
* @param diff Original MapDiff to transform
152+
* @param keyFunction Function to transform the key
153+
* @param keySerializer Serializer for the new key
154+
*/
155+
public static <K1, K2, T extends Diffable<T>, M1 extends Map<K1, T>> MapDiff<K2, T, Map<K2, T>> jdkMapDiffWithUpdatedKeys(
156+
MapDiff<K1, T, M1> diff,
157+
Function<K1, K2> keyFunction,
158+
KeySerializer<K2> keySerializer
159+
) {
160+
final List<K2> deletes = diff.getDeletes().stream().map(keyFunction).toList();
161+
final List<Map.Entry<K2, Diff<T>>> diffs = diff.getDiffs()
162+
.stream()
163+
.map(entry -> Map.entry(keyFunction.apply(entry.getKey()), entry.getValue()))
164+
.toList();
165+
final List<Map.Entry<K2, T>> upserts = diff.getUpserts()
166+
.stream()
167+
.map(entry -> Map.entry(keyFunction.apply(entry.getKey()), entry.getValue()))
168+
.toList();
169+
return new MapDiff<>(keySerializer, DiffableValueSerializer.getWriteOnlyInstance(), deletes, diffs, upserts, JdkMapBuilder::new);
170+
}
171+
149172
/**
150173
* Creates a MapDiff that applies a single entry diff to a map
151174
*/

0 commit comments

Comments
 (0)