Skip to content

Commit 1d0064a

Browse files
thecoopcbuescher
authored andcommitted
Create IndexVersionAllocationDecider (elastic#102708)
Create IndexVersionAllocationDecider as a counterpart to NodeVersionAllocationDecider, that checks the max index version rather than node version
1 parent e0e6402 commit 1d0064a

File tree

7 files changed

+865
-22
lines changed

7 files changed

+865
-22
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
4646
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
4747
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
48+
import org.elasticsearch.cluster.routing.allocation.decider.IndexVersionAllocationDecider;
4849
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
4950
import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider;
5051
import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider;
@@ -364,6 +365,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
364365
addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(clusterSettings));
365366
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(clusterSettings));
366367
addAllocationDecider(deciders, new EnableAllocationDecider(clusterSettings));
368+
addAllocationDecider(deciders, new IndexVersionAllocationDecider());
367369
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
368370
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
369371
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.cluster.routing.allocation.decider;
10+
11+
import org.elasticsearch.cluster.routing.RecoverySource;
12+
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
13+
import org.elasticsearch.cluster.routing.RoutingNode;
14+
import org.elasticsearch.cluster.routing.RoutingNodes;
15+
import org.elasticsearch.cluster.routing.ShardRouting;
16+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
17+
18+
/**
19+
* An allocation decider that prevents relocation or allocation from nodes
20+
* that might not be index compatible. If we relocate from a node that uses
21+
* a newer index version than the node we relocate to this might cause {@link org.apache.lucene.index.IndexFormatTooNewException}
22+
* on the lowest level since it might have already written segments that use a new postings format or codec that is not
23+
* available on the target node.
24+
*/
25+
public class IndexVersionAllocationDecider extends AllocationDecider {
26+
27+
public static final String NAME = "index_version";
28+
29+
@Override
30+
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
31+
if (shardRouting.primary()) {
32+
if (shardRouting.currentNodeId() == null) {
33+
if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
34+
// restoring from a snapshot - check that the node can handle the version
35+
return isVersionCompatible((SnapshotRecoverySource) shardRouting.recoverySource(), node, allocation);
36+
} else {
37+
// existing or fresh primary on the node
38+
return allocation.decision(Decision.YES, NAME, "no existing allocation, assuming compatible");
39+
}
40+
} else {
41+
// relocating primary, only migrate to newer host
42+
return isIndexVersionCompatibleRelocatePrimary(allocation.routingNodes(), shardRouting.currentNodeId(), node, allocation);
43+
}
44+
} else {
45+
final ShardRouting primary = allocation.routingNodes().activePrimary(shardRouting.shardId());
46+
// check that active primary has a newer version so that peer recovery works
47+
if (primary != null) {
48+
return isIndexVersionCompatibleAllocatingReplica(allocation.routingNodes(), primary.currentNodeId(), node, allocation);
49+
} else {
50+
// ReplicaAfterPrimaryActiveAllocationDecider should prevent this case from occurring
51+
return allocation.decision(Decision.YES, NAME, "no active primary shard yet");
52+
}
53+
}
54+
}
55+
56+
@Override
57+
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
58+
return canAllocate(shardRouting, node, allocation);
59+
}
60+
61+
private static Decision isIndexVersionCompatibleRelocatePrimary(
62+
final RoutingNodes routingNodes,
63+
final String sourceNodeId,
64+
final RoutingNode target,
65+
final RoutingAllocation allocation
66+
) {
67+
final RoutingNode source = routingNodes.node(sourceNodeId);
68+
if (target.node().getMaxIndexVersion().onOrAfter(source.node().getMaxIndexVersion())) {
69+
return allocation.decision(
70+
Decision.YES,
71+
NAME,
72+
"can relocate primary shard from a node with index version [%s] to a node with equal-or-newer index version [%s]",
73+
source.node().getMaxIndexVersion().toReleaseVersion(),
74+
target.node().getMaxIndexVersion().toReleaseVersion()
75+
);
76+
} else {
77+
return allocation.decision(
78+
Decision.NO,
79+
NAME,
80+
"cannot relocate primary shard from a node with index version [%s] to a node with older index version [%s]",
81+
source.node().getMaxIndexVersion().toReleaseVersion(),
82+
target.node().getMaxIndexVersion().toReleaseVersion()
83+
);
84+
}
85+
}
86+
87+
private static Decision isIndexVersionCompatibleAllocatingReplica(
88+
final RoutingNodes routingNodes,
89+
final String sourceNodeId,
90+
final RoutingNode target,
91+
final RoutingAllocation allocation
92+
) {
93+
final RoutingNode source = routingNodes.node(sourceNodeId);
94+
if (target.node().getMaxIndexVersion().onOrAfter(source.node().getMaxIndexVersion())) {
95+
/* we can allocate if we can recover from a node that is younger or on the same version
96+
* if the primary is already running on a newer version that won't work due to possible
97+
* differences in the lucene index format etc.*/
98+
return allocation.decision(
99+
Decision.YES,
100+
NAME,
101+
"can allocate replica shard to a node with index version [%s]"
102+
+ " since this is equal-or-newer than the primary index version [%s]",
103+
target.node().getMaxIndexVersion().toReleaseVersion(),
104+
source.node().getMaxIndexVersion().toReleaseVersion()
105+
);
106+
} else {
107+
return allocation.decision(
108+
Decision.NO,
109+
NAME,
110+
"cannot allocate replica shard to a node with index version [%s]"
111+
+ " since this is older than the primary index version [%s]",
112+
target.node().getMaxIndexVersion().toReleaseVersion(),
113+
source.node().getMaxIndexVersion().toReleaseVersion()
114+
);
115+
}
116+
}
117+
118+
private static Decision isVersionCompatible(
119+
SnapshotRecoverySource recoverySource,
120+
final RoutingNode target,
121+
final RoutingAllocation allocation
122+
) {
123+
if (target.node().getMaxIndexVersion().onOrAfter(recoverySource.version())) {
124+
/* we can allocate if we can restore from a snapshot that is older or on the same version */
125+
return allocation.decision(
126+
Decision.YES,
127+
NAME,
128+
"max supported index version [%s] is the same or newer than snapshot version [%s]",
129+
target.node().getMaxIndexVersion().toReleaseVersion(),
130+
recoverySource.version().toReleaseVersion()
131+
);
132+
} else {
133+
return allocation.decision(
134+
Decision.NO,
135+
NAME,
136+
"max supported index version [%s] is older than the snapshot version [%s]",
137+
target.node().getMaxIndexVersion().toReleaseVersion(),
138+
recoverySource.version().toReleaseVersion()
139+
);
140+
}
141+
}
142+
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
3030
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
3131
if (shardRouting.primary()) {
3232
if (shardRouting.currentNodeId() == null) {
33-
if (shardRouting.recoverySource() != null && shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
33+
if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
3434
// restoring from a snapshot - check that the node can handle the version
3535
return isVersionCompatible((SnapshotRecoverySource) shardRouting.recoverySource(), node, allocation);
3636
} else {

server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
2222
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
2323
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
24+
import org.elasticsearch.cluster.routing.allocation.decider.IndexVersionAllocationDecider;
2425
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
2526
import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider;
2627
import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider;
@@ -49,17 +50,20 @@
4950
import org.elasticsearch.test.gateway.TestGatewayAllocator;
5051
import org.elasticsearch.threadpool.TestThreadPool;
5152
import org.elasticsearch.threadpool.ThreadPool;
53+
import org.hamcrest.Matcher;
54+
import org.hamcrest.Matchers;
5255
import org.junit.AfterClass;
5356
import org.junit.BeforeClass;
5457

55-
import java.util.Arrays;
5658
import java.util.Collection;
5759
import java.util.Collections;
58-
import java.util.Iterator;
5960
import java.util.List;
6061
import java.util.Map;
6162
import java.util.concurrent.TimeUnit;
6263
import java.util.function.Supplier;
64+
import java.util.stream.Stream;
65+
66+
import static org.hamcrest.Matchers.contains;
6367

6468
public class ClusterModuleTests extends ModuleTestCase {
6569
private ClusterInfoService clusterInfoService = EmptyClusterInfoService.INSTANCE;
@@ -232,14 +236,15 @@ public void testShardsAllocatorFactoryNull() {
232236
// running them. If the order of the deciders is changed for a valid reason, the order should be
233237
// changed in the test too.
234238
public void testAllocationDeciderOrder() {
235-
List<Class<? extends AllocationDecider>> expectedDeciders = Arrays.asList(
239+
Stream<Class<? extends AllocationDecider>> expectedDeciders = Stream.of(
236240
MaxRetryAllocationDecider.class,
237241
ResizeAllocationDecider.class,
238242
ReplicaAfterPrimaryActiveAllocationDecider.class,
239243
RebalanceOnlyWhenActiveAllocationDecider.class,
240244
ClusterRebalanceAllocationDecider.class,
241245
ConcurrentRebalanceAllocationDecider.class,
242246
EnableAllocationDecider.class,
247+
IndexVersionAllocationDecider.class,
243248
NodeVersionAllocationDecider.class,
244249
SnapshotInProgressAllocationDecider.class,
245250
RestoreInProgressAllocationDecider.class,
@@ -257,12 +262,7 @@ public void testAllocationDeciderOrder() {
257262
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
258263
Collections.emptyList()
259264
);
260-
Iterator<AllocationDecider> iter = deciders.iterator();
261-
int idx = 0;
262-
while (iter.hasNext()) {
263-
AllocationDecider decider = iter.next();
264-
assertSame(decider.getClass(), expectedDeciders.get(idx++));
265-
}
265+
assertThat(deciders, contains(expectedDeciders.<Matcher<? super AllocationDecider>>map(Matchers::instanceOf).toList()));
266266
}
267267

268268
public void testRejectsReservedExistingShardsAllocatorName() {

0 commit comments

Comments
 (0)