Skip to content

Commit d07986e

Browse files
committed
Make SnapshotsInProgress project compatible
This PR adds project-id to both SnapshotsInProgress and Snapshot so that they are aware of projects and ready to handle snapshots from multiple projects. Relates: ES-10224
1 parent 6c04abc commit d07986e

File tree

7 files changed

+284
-44
lines changed

7 files changed

+284
-44
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ static TransportVersion def(int id) {
199199
public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00);
200200
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_035_0_00);
201201
public static final TransportVersion INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD = def(9_036_0_00);
202+
public static final TransportVersion PROJECT_ID_IN_SNAPSHOT = def(9_037_0_00);
202203

203204
/*
204205
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,26 @@ public static <K, T, M extends Map<K, T>> boolean hasKey(MapDiff<K, T, M> diff,
146146
return false;
147147
}
148148

149+
/**
150+
* Create a new MapDiff by transforming the keys with the provided keyFunction
151+
*/
152+
public static <K1, K2, T extends Diffable<T>, M1 extends Map<K1, T>> MapDiff<K2, T, Map<K2, T>> jdkMapDiffWithUpdatedKeys(
153+
MapDiff<K1, T, M1> diff,
154+
Function<K1, K2> keyFunction,
155+
KeySerializer<K2> keySerializer
156+
) {
157+
final List<K2> deletes = diff.getDeletes().stream().map(keyFunction).toList();
158+
final List<Map.Entry<K2, Diff<T>>> diffs = diff.getDiffs()
159+
.stream()
160+
.map(entry -> Map.entry(keyFunction.apply(entry.getKey()), entry.getValue()))
161+
.toList();
162+
final List<Map.Entry<K2, T>> upserts = diff.getUpserts()
163+
.stream()
164+
.map(entry -> Map.entry(keyFunction.apply(entry.getKey()), entry.getValue()))
165+
.toList();
166+
return new MapDiff<>(keySerializer, DiffableValueSerializer.getWriteOnlyInstance(), deletes, diffs, upserts, JdkMapBuilder::new);
167+
}
168+
149169
/**
150170
* Creates a MapDiff that applies a single entry diff to a map
151171
*/

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 111 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.TransportVersion;
1313
import org.elasticsearch.TransportVersions;
1414
import org.elasticsearch.cluster.ClusterState.Custom;
15+
import org.elasticsearch.cluster.metadata.ProjectId;
1516
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.common.Strings;
@@ -32,6 +33,7 @@
3233
import org.elasticsearch.logging.Logger;
3334
import org.elasticsearch.repositories.IndexId;
3435
import org.elasticsearch.repositories.RepositoryOperation;
36+
import org.elasticsearch.repositories.RepositoryOperation.ProjectRepo;
3537
import org.elasticsearch.repositories.RepositoryShardId;
3638
import org.elasticsearch.repositories.ShardGeneration;
3739
import org.elasticsearch.repositories.ShardSnapshotResult;
@@ -56,6 +58,9 @@
5658
import java.util.Set;
5759
import java.util.stream.Stream;
5860

61+
import static org.elasticsearch.repositories.RepositoryOperation.PROJECT_REPO_SERIALIZER;
62+
import static org.elasticsearch.repositories.RepositoryOperation.defaultProjectRepo;
63+
5964
/**
6065
* Meta data about snapshots that are currently executing
6166
*/
@@ -70,7 +75,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
7075
public static final String ABORTED_FAILURE_TEXT = "Snapshot was aborted by deletion";
7176

7277
/** Maps repository name to list of snapshots in that repository */
73-
private final Map<String, ByRepo> entries;
78+
private final Map<ProjectRepo, ByRepo> entries;
7479

7580
/**
7681
* IDs of nodes which are marked for removal, or which were previously marked for removal and still have running shard snapshots.
@@ -104,56 +109,69 @@ private static Set<String> readNodeIdsForRemoval(StreamInput in) throws IOExcept
104109
: Set.of();
105110
}
106111

107-
private static Map<String, ByRepo> collectByRepo(StreamInput in) throws IOException {
112+
private static Map<ProjectRepo, ByRepo> collectByRepo(StreamInput in) throws IOException {
108113
final int count = in.readVInt();
109114
if (count == 0) {
110115
return Map.of();
111116
}
112-
final Map<String, List<Entry>> entriesByRepo = new HashMap<>();
117+
final Map<ProjectRepo, List<Entry>> entriesByRepo = new HashMap<>();
113118
for (int i = 0; i < count; i++) {
114119
final Entry entry = Entry.readFrom(in);
115-
entriesByRepo.computeIfAbsent(entry.repository(), repo -> new ArrayList<>()).add(entry);
120+
entriesByRepo.computeIfAbsent(new ProjectRepo(entry.projectId(), entry.repository()), repo -> new ArrayList<>()).add(entry);
116121
}
117-
final Map<String, ByRepo> res = Maps.newMapWithExpectedSize(entriesByRepo.size());
118-
for (Map.Entry<String, List<Entry>> entryForRepo : entriesByRepo.entrySet()) {
122+
final Map<ProjectRepo, ByRepo> res = Maps.newMapWithExpectedSize(entriesByRepo.size());
123+
for (Map.Entry<ProjectRepo, List<Entry>> entryForRepo : entriesByRepo.entrySet()) {
119124
res.put(entryForRepo.getKey(), new ByRepo(entryForRepo.getValue()));
120125
}
121126
return res;
122127
}
123128

124-
private SnapshotsInProgress(Map<String, ByRepo> entries, Set<String> nodesIdsForRemoval) {
129+
private SnapshotsInProgress(Map<ProjectRepo, ByRepo> entries, Set<String> nodesIdsForRemoval) {
125130
this.entries = Map.copyOf(entries);
126131
this.nodesIdsForRemoval = nodesIdsForRemoval;
127132
assert assertConsistentEntries(this.entries);
128133
}
129134

135+
@Deprecated(forRemoval = true)
130136
public SnapshotsInProgress withUpdatedEntriesForRepo(String repository, List<Entry> updatedEntries) {
131-
if (updatedEntries.equals(forRepo(repository))) {
137+
return withUpdatedEntriesForRepo(defaultProjectRepo(repository), updatedEntries);
138+
}
139+
140+
public SnapshotsInProgress withUpdatedEntriesForRepo(ProjectRepo projectRepo, List<Entry> updatedEntries) {
141+
if (updatedEntries.equals(forRepo(projectRepo))) {
132142
return this;
133143
}
134-
final Map<String, ByRepo> copy = new HashMap<>(this.entries);
144+
final Map<ProjectRepo, ByRepo> copy = new HashMap<>(this.entries);
135145
if (updatedEntries.isEmpty()) {
136-
copy.remove(repository);
146+
copy.remove(projectRepo);
137147
if (copy.isEmpty()) {
138148
return EMPTY;
139149
}
140150
} else {
141-
copy.put(repository, new ByRepo(updatedEntries));
151+
copy.put(projectRepo, new ByRepo(updatedEntries));
142152
}
143153
return new SnapshotsInProgress(copy, nodesIdsForRemoval);
144154
}
145155

146156
public SnapshotsInProgress withAddedEntry(Entry entry) {
147-
final List<Entry> forRepo = new ArrayList<>(forRepo(entry.repository()));
157+
final List<Entry> forRepo = new ArrayList<>(forRepo(entry.projectRepo()));
148158
forRepo.add(entry);
149-
return withUpdatedEntriesForRepo(entry.repository(), forRepo);
159+
return withUpdatedEntriesForRepo(entry.projectRepo(), forRepo);
150160
}
151161

152162
/**
153163
* Returns the list of snapshots in the specified repository.
154164
*/
165+
@Deprecated(forRemoval = true)
155166
public List<Entry> forRepo(String repository) {
156-
return entries.getOrDefault(repository, ByRepo.EMPTY).entries;
167+
return entries.getOrDefault(defaultProjectRepo(repository), ByRepo.EMPTY).entries;
168+
}
169+
170+
/**
171+
* Returns the list of snapshots in the specified repository.
172+
*/
173+
public List<Entry> forRepo(ProjectRepo projectRepo) {
174+
return entries.getOrDefault(projectRepo, ByRepo.EMPTY).entries;
157175
}
158176

159177
public boolean isEmpty() {
@@ -178,7 +196,7 @@ public Stream<Entry> asStream() {
178196

179197
@Nullable
180198
public Entry snapshot(final Snapshot snapshot) {
181-
return findSnapshotInList(snapshot, forRepo(snapshot.getRepository()));
199+
return findSnapshotInList(snapshot, forRepo(snapshot.getProjectRepo()));
182200
}
183201

184202
/**
@@ -206,14 +224,32 @@ private static Entry findSnapshotInList(Snapshot snapshotToFind, List<Entry> for
206224
* in-progress shard snapshots that were not yet finalized when it began. All these other in-progress shard snapshot lists are scheduled
207225
* for deletion now.
208226
*/
227+
@Deprecated(forRemoval = true)
209228
public Map<RepositoryShardId, Set<ShardGeneration>> obsoleteGenerations(
210229
String repository,
211230
SnapshotsInProgress oldClusterStateSnapshots
231+
) {
232+
return obsoleteGenerations(defaultProjectRepo(repository), oldClusterStateSnapshots);
233+
}
234+
235+
/**
236+
* Computes a map of repository shard id to set of shard generations, containing all shard generations that became obsolete and may be
237+
* deleted from the repository as the cluster state moves from the given old value of {@link SnapshotsInProgress} to this instance.
238+
* <p>
239+
* An unique shard generation is created for every in-progress shard snapshot. The shard generation file contains information about all
240+
* the files needed by pre-existing and any new shard snapshots that were in-progress. When a shard snapshot is finalized, its file list
241+
* is promoted to the official shard snapshot list for the index shard. This final list will contain metadata about any other
242+
* in-progress shard snapshots that were not yet finalized when it began. All these other in-progress shard snapshot lists are scheduled
243+
* for deletion now.
244+
*/
245+
public Map<RepositoryShardId, Set<ShardGeneration>> obsoleteGenerations(
246+
ProjectRepo projectRepo,
247+
SnapshotsInProgress oldClusterStateSnapshots
212248
) {
213249
final Map<RepositoryShardId, Set<ShardGeneration>> obsoleteGenerations = new HashMap<>();
214-
final List<Entry> latestSnapshots = forRepo(repository);
250+
final List<Entry> latestSnapshots = forRepo(projectRepo);
215251

216-
for (Entry oldEntry : oldClusterStateSnapshots.forRepo(repository)) {
252+
for (Entry oldEntry : oldClusterStateSnapshots.forRepo(projectRepo)) {
217253
final Entry matchingLatestEntry = findSnapshotInList(oldEntry.snapshot(), latestSnapshots);
218254
if (matchingLatestEntry == null || matchingLatestEntry == oldEntry) {
219255
// The snapshot progress has not changed.
@@ -412,15 +448,15 @@ private static boolean hasFailures(Map<RepositoryShardId, ShardSnapshotStatus> c
412448
return false;
413449
}
414450

415-
private static boolean assertConsistentEntries(Map<String, ByRepo> entries) {
416-
for (Map.Entry<String, ByRepo> repoEntries : entries.entrySet()) {
451+
private static boolean assertConsistentEntries(Map<ProjectRepo, ByRepo> entries) {
452+
for (Map.Entry<ProjectRepo, ByRepo> repoEntries : entries.entrySet()) {
417453
final Set<Tuple<String, Integer>> assignedShards = new HashSet<>();
418454
final Set<Tuple<String, Integer>> queuedShards = new HashSet<>();
419455
final List<Entry> entriesForRepository = repoEntries.getValue().entries;
420-
final String repository = repoEntries.getKey();
456+
final ProjectRepo repository = repoEntries.getKey();
421457
assert entriesForRepository.isEmpty() == false : "found empty list of snapshots for " + repository + " in " + entries;
422458
for (Entry entry : entriesForRepository) {
423-
assert entry.repository().equals(repository) : "mismatched repository " + entry + " tracked under " + repository;
459+
assert entry.projectRepo().equals(repository) : "mismatched repository " + entry + " tracked under " + repository;
424460
for (Map.Entry<RepositoryShardId, ShardSnapshotStatus> shard : entry.shardSnapshotStatusByRepoShardId().entrySet()) {
425461
final RepositoryShardId sid = shard.getKey();
426462
final ShardSnapshotStatus shardSnapshotStatus = shard.getValue();
@@ -1241,11 +1277,20 @@ public Entry withStartedShards(Map<ShardId, ShardSnapshotStatus> shards) {
12411277
return updated;
12421278
}
12431279

1280+
@Override
1281+
public ProjectId projectId() {
1282+
return snapshot.getProjectId();
1283+
}
1284+
12441285
@Override
12451286
public String repository() {
12461287
return snapshot.getRepository();
12471288
}
12481289

1290+
public ProjectRepo projectRepo() {
1291+
return new ProjectRepo(projectId(), repository());
1292+
}
1293+
12491294
public Snapshot snapshot() {
12501295
return this.snapshot;
12511296
}
@@ -1391,6 +1436,7 @@ public String toString() {
13911436
@Override
13921437
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
13931438
builder.startObject();
1439+
builder.field("project_id", snapshot.getProjectId());
13941440
builder.field("repository", snapshot.getRepository());
13951441
builder.field("snapshot", snapshot.getSnapshotId().getName());
13961442
builder.field("uuid", snapshot.getSnapshotId().getUUID());
@@ -1725,25 +1771,43 @@ private static final class SnapshotInProgressDiff implements NamedDiff<Custom> {
17251771

17261772
private final SnapshotsInProgress after;
17271773

1728-
private final DiffableUtils.MapDiff<String, ByRepo, Map<String, ByRepo>> mapDiff;
1774+
private final DiffableUtils.MapDiff<ProjectRepo, ByRepo, Map<ProjectRepo, ByRepo>> mapDiff;
17291775
private final Set<String> nodeIdsForRemoval;
17301776

17311777
SnapshotInProgressDiff(SnapshotsInProgress before, SnapshotsInProgress after) {
1732-
this.mapDiff = DiffableUtils.diff(before.entries, after.entries, DiffableUtils.getStringKeySerializer());
1778+
this.mapDiff = DiffableUtils.diff(before.entries, after.entries, PROJECT_REPO_SERIALIZER);
17331779
this.nodeIdsForRemoval = after.nodesIdsForRemoval;
17341780
this.after = after;
17351781
}
17361782

17371783
SnapshotInProgressDiff(StreamInput in) throws IOException {
1738-
this.mapDiff = DiffableUtils.readJdkMapDiff(
1739-
in,
1740-
DiffableUtils.getStringKeySerializer(),
1741-
i -> new ByRepo(i.readCollectionAsImmutableList(Entry::readFrom)),
1742-
i -> new ByRepo.ByRepoDiff(
1743-
DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), Entry::readFrom, EntryDiff::new),
1744-
DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), ByRepo.INT_DIFF_VALUE_SERIALIZER)
1745-
)
1746-
);
1784+
if (in.getTransportVersion().before(TransportVersions.PROJECT_ID_IN_SNAPSHOT)) {
1785+
final var oldMapDiff = DiffableUtils.readJdkMapDiff(
1786+
in,
1787+
DiffableUtils.getStringKeySerializer(),
1788+
i -> new ByRepo(i.readCollectionAsImmutableList(Entry::readFrom)),
1789+
i -> new ByRepo.ByRepoDiff(
1790+
DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), Entry::readFrom, EntryDiff::new),
1791+
DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), ByRepo.INT_DIFF_VALUE_SERIALIZER)
1792+
)
1793+
);
1794+
this.mapDiff = DiffableUtils.jdkMapDiffWithUpdatedKeys(
1795+
oldMapDiff,
1796+
RepositoryOperation::defaultProjectRepo,
1797+
PROJECT_REPO_SERIALIZER
1798+
);
1799+
} else {
1800+
this.mapDiff = DiffableUtils.readJdkMapDiff(
1801+
in,
1802+
PROJECT_REPO_SERIALIZER,
1803+
i -> new ByRepo(i.readCollectionAsImmutableList(Entry::readFrom)),
1804+
i -> new ByRepo.ByRepoDiff(
1805+
DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), Entry::readFrom, EntryDiff::new),
1806+
DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), ByRepo.INT_DIFF_VALUE_SERIALIZER)
1807+
)
1808+
);
1809+
}
1810+
17471811
this.nodeIdsForRemoval = readNodeIdsForRemoval(in);
17481812
this.after = null;
17491813
}
@@ -1768,7 +1832,21 @@ public String getWriteableName() {
17681832
public void writeTo(StreamOutput out) throws IOException {
17691833
assert after != null : "should only write instances that were diffed from this node's state";
17701834
if (out.getTransportVersion().onOrAfter(DIFFABLE_VERSION)) {
1771-
mapDiff.writeTo(out);
1835+
if (out.getTransportVersion().before(TransportVersions.PROJECT_ID_IN_SNAPSHOT)) {
1836+
DiffableUtils.jdkMapDiffWithUpdatedKeys(mapDiff, projectRepo -> {
1837+
if (ProjectId.DEFAULT.equals(projectRepo.projectId()) == false) {
1838+
throw new IllegalArgumentException(
1839+
"Cannot write instance with non-default project id "
1840+
+ projectRepo.projectId()
1841+
+ " to version before "
1842+
+ TransportVersions.PROJECT_ID_IN_SNAPSHOT
1843+
);
1844+
}
1845+
return projectRepo.repoName();
1846+
}, DiffableUtils.getStringKeySerializer()).writeTo(out);
1847+
} else {
1848+
mapDiff.writeTo(out);
1849+
}
17721850
} else {
17731851
new SimpleDiffable.CompleteDiff<>(after).writeTo(out);
17741852
}

server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,69 @@
88
*/
99
package org.elasticsearch.repositories;
1010

11+
import org.elasticsearch.cluster.DiffableUtils;
12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
import org.elasticsearch.common.io.stream.StreamInput;
14+
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.common.io.stream.Writeable;
16+
import org.elasticsearch.core.FixForMultiProject;
17+
18+
import java.io.IOException;
19+
1120
/**
1221
* Coordinates of an operation that modifies a repository, assuming that repository at a specific generation.
1322
*/
1423
public interface RepositoryOperation {
1524

25+
/**
26+
* Project for which repository belongs to.
27+
*/
28+
@FixForMultiProject(description = "default implementation is temporary")
29+
default ProjectId projectId() {
30+
return ProjectId.DEFAULT;
31+
}
32+
1633
/**
1734
* Name of the repository affected.
1835
*/
1936
String repository();
2037

38+
default ProjectRepo projectRepo() {
39+
return new ProjectRepo(projectId(), repository());
40+
}
41+
2142
/**
2243
* The repository state id at the time the operation began.
2344
*/
2445
long repositoryStateId();
46+
47+
record ProjectRepo(ProjectId projectId, String repoName) implements Writeable {
48+
49+
public ProjectRepo(StreamInput in) throws IOException {
50+
this(ProjectId.readFrom(in), in.readString());
51+
}
52+
53+
@Override
54+
public void writeTo(StreamOutput out) throws IOException {
55+
projectId.writeTo(out);
56+
out.writeString(repoName);
57+
}
58+
}
59+
60+
DiffableUtils.KeySerializer<ProjectRepo> PROJECT_REPO_SERIALIZER = new DiffableUtils.KeySerializer<>() {
61+
@Override
62+
public void writeKey(ProjectRepo key, StreamOutput out) throws IOException {
63+
key.writeTo(out);
64+
}
65+
66+
@Override
67+
public ProjectRepo readKey(StreamInput in) throws IOException {
68+
return new ProjectRepo(in);
69+
}
70+
};
71+
72+
@Deprecated(forRemoval = true)
73+
static ProjectRepo defaultProjectRepo(String repoName) {
74+
return new ProjectRepo(ProjectId.DEFAULT, repoName);
75+
}
2576
}

0 commit comments

Comments
 (0)