Skip to content

Commit ad8e22d

Browse files
committed
Fix diff building for combined cluster and project persistent tasks (MP-1965)
When the combined tasks diff is used, we need to remove the separate project and cluster tasks from the original diff. Relates: MP-1945, MP-1938
1 parent b882c07 commit ad8e22d

File tree

3 files changed

+53
-8
lines changed

3 files changed

+53
-8
lines changed

server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.HashMap;
2626
import java.util.List;
2727
import java.util.Map;
28+
import java.util.Objects;
2829
import java.util.Set;
2930
import java.util.function.Consumer;
3031
import java.util.function.Function;
@@ -120,15 +121,31 @@ public static <K, T, T1 extends T, T2 extends T, M extends Map<K, T>> MapDiff<K,
120121
}
121122

122123
/**
123-
* Create a new MapDiff by removing the key from any of its deletes, diffs and upserts
124+
* Create a new MapDiff by removing the keys from any of its deletes, diffs and upserts
124125
*/
125-
public static <K, T, M extends Map<K, T>> MapDiff<K, T, M> removeKey(MapDiff<K, T, M> diff, K key) {
126-
final List<K> deletes = diff.getDeletes().stream().filter(k -> k.equals(key) == false).toList();
127-
final List<Map.Entry<K, Diff<T>>> diffs = diff.getDiffs().stream().filter(entry -> entry.getKey().equals(key) == false).toList();
128-
final List<Map.Entry<K, T>> upserts = diff.getUpserts().stream().filter(entry -> entry.getKey().equals(key) == false).toList();
126+
public static <K, T, M extends Map<K, T>> MapDiff<K, T, M> removeKeys(MapDiff<K, T, M> diff, Set<K> keys) {
127+
final List<K> deletes = diff.getDeletes().stream().filter(k -> keys.contains(k) == false).toList();
128+
final List<Map.Entry<K, Diff<T>>> diffs = diff.getDiffs().stream().filter(entry -> keys.contains(entry.getKey()) == false).toList();
129+
final List<Map.Entry<K, T>> upserts = diff.getUpserts().stream().filter(entry -> keys.contains(entry.getKey()) == false).toList();
129130
return new MapDiff<>(diff.keySerializer, diff.valueSerializer, deletes, diffs, upserts, diff.builderCtor);
130131
}
131132

133+
/**
134+
* Check whether the specified MapDiff has any changes associated with the specified key
135+
*/
136+
public static <K, T, M extends Map<K, T>> boolean hasKey(MapDiff<K, T, M> diff, K key) {
137+
if (diff.getDeletes().contains(key)) {
138+
return true;
139+
}
140+
if (diff.getDiffs().stream().map(Map.Entry::getKey).anyMatch(k -> Objects.equals(k, key))) {
141+
return true;
142+
}
143+
if (diff.getUpserts().stream().map(Map.Entry::getKey).anyMatch(k -> Objects.equals(k, key))) {
144+
return true;
145+
}
146+
return false;
147+
}
148+
132149
/**
133150
* Creates a MapDiff that applies a single entry diff to a map
134151
*/

server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -998,12 +998,20 @@ public void writeTo(StreamOutput out) throws IOException {
998998
if (combinedTasksDiff == null) {
999999
// No combined diff means either (1) no tasks are involved or (2) the diff is from an old node
10001000
// In both cases, we can proceed without further changes.
1001+
// For both cases, no cluster persistent tasks should exist in the merged diff
1002+
assert DiffableUtils.hasKey(mergedClusterAndProjectCustomDiff, ClusterPersistentTasksCustomMetadata.TYPE) == false;
1003+
// Unless it is (2), the merge diff should not contain project persistent tasks
1004+
assert fromNodeBeforeMultiProjectsSupport
1005+
|| DiffableUtils.hasKey(mergedClusterAndProjectCustomDiff, PersistentTasksCustomMetadata.TYPE) == false;
10011006
return mergedClusterAndProjectCustomDiff;
10021007
} else {
10031008
// We need first delete the persistent tasks entries from the diffs by cluster and project customs themselves.
10041009
// Then add the combined tasks diff to the result.
10051010
return DiffableUtils.merge(
1006-
DiffableUtils.removeKey(mergedClusterAndProjectCustomDiff, PersistentTasksCustomMetadata.TYPE),
1011+
DiffableUtils.removeKeys(
1012+
mergedClusterAndProjectCustomDiff,
1013+
Set.of(PersistentTasksCustomMetadata.TYPE, ClusterPersistentTasksCustomMetadata.TYPE)
1014+
),
10071015
combinedTasksDiff,
10081016
DiffableUtils.getStringKeySerializer(),
10091017
BWC_CUSTOM_VALUE_SERIALIZER

server/src/test/java/org/elasticsearch/cluster/metadata/MetadataPersistentTasksTests.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
public class MetadataPersistentTasksTests extends ESTestCase {
4949

5050
private NamedWriteableRegistry namedWriteableRegistry;
51+
private NamedWriteableRegistry namedWriteableRegistryBwc;
5152

5253
@Before
5354
public void initializeRegistries() {
@@ -74,6 +75,25 @@ public void initializeRegistries() {
7475
)
7576
).toList()
7677
);
78+
namedWriteableRegistryBwc = new NamedWriteableRegistry(
79+
Stream.concat(
80+
ClusterModule.getNamedWriteables()
81+
.stream()
82+
.filter(entry -> entry.name.equals(ClusterPersistentTasksCustomMetadata.TYPE) == false),
83+
Stream.of(
84+
new NamedWriteableRegistry.Entry(
85+
PersistentTaskParams.class,
86+
TestClusterPersistentTasksParams.NAME,
87+
TestClusterPersistentTasksParams::new
88+
),
89+
new NamedWriteableRegistry.Entry(
90+
PersistentTaskParams.class,
91+
TestProjectPersistentTasksParams.NAME,
92+
TestProjectPersistentTasksParams::new
93+
)
94+
)
95+
).toList()
96+
);
7797
}
7898

7999
public void testPersistentTasksSerialization() throws IOException {
@@ -114,7 +134,7 @@ public void testPersistentTasksSerializationBwc() throws IOException {
114134
out.setTransportVersion(previousVersion);
115135
orig.writeTo(out);
116136

117-
final var in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry);
137+
final var in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistryBwc);
118138
in.setTransportVersion(previousVersion);
119139
final Metadata fromStream = Metadata.readFrom(in);
120140

@@ -148,7 +168,7 @@ public void testPersistentTasksDiffSerializationBwc() throws IOException {
148168
out.setTransportVersion(previousVersion);
149169
diff.writeTo(out);
150170

151-
final var in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry);
171+
final var in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistryBwc);
152172
in.setTransportVersion(previousVersion);
153173
final Diff<Metadata> diffFromStream = Metadata.readDiffFrom(in);
154174
final Metadata metadataFromDiff = diffFromStream.apply(before);

0 commit comments

Comments
 (0)