Skip to content

Commit eb574de

Browse files
committed
add shrink group allocation decider
1 parent bdc2077 commit eb574de

File tree

8 files changed

+268
-14
lines changed

8 files changed

+268
-14
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider;
6060
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
6161
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
62+
import org.elasticsearch.cluster.routing.allocation.decider.ShrinkGroupAllocationDecider;
6263
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
6364
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
6465
import org.elasticsearch.cluster.service.ClusterService;
@@ -411,6 +412,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
411412
addAllocationDecider(deciders, new NodeShutdownAllocationDecider());
412413
addAllocationDecider(deciders, new NodeReplacementAllocationDecider());
413414
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
415+
addAllocationDecider(deciders, new ShrinkGroupAllocationDecider());
414416
addAllocationDecider(deciders, new SameShardAllocationDecider(clusterSettings));
415417
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
416418
addAllocationDecider(deciders, new ThrottlingAllocationDecider(clusterSettings));

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.cluster.block.ClusterBlock;
2323
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2424
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
25+
import org.elasticsearch.cluster.node.NodeGroup;
2526
import org.elasticsearch.cluster.routing.IndexRouting;
2627
import org.elasticsearch.cluster.routing.allocation.DataTier;
2728
import org.elasticsearch.cluster.routing.allocation.IndexMetadataUpdater;
@@ -449,6 +450,8 @@ public Iterator<Setting<?>> settings() {
449450
public static final String INDEX_ROUTING_REQUIRE_GROUP_PREFIX = "index.routing.allocation.require";
450451
public static final String INDEX_ROUTING_INCLUDE_GROUP_PREFIX = "index.routing.allocation.include";
451452
public static final String INDEX_ROUTING_EXCLUDE_GROUP_PREFIX = "index.routing.allocation.exclude";
453+
public static final String INDEX_ROUTING_INITIAL_RECOVERY_GROUP = "index.routing.allocation.initial_recovery_group";
454+
public static final String INDEX_ROUTING_SHRINK_GROUP = "index.routing.allocation.shink_group";
452455

453456
public static final Setting.AffixSetting<List<String>> INDEX_ROUTING_REQUIRE_GROUP_SETTING = Setting.prefixKeySetting(
454457
INDEX_ROUTING_REQUIRE_GROUP_PREFIX + ".",
@@ -464,8 +467,10 @@ public Iterator<Setting<?>> settings() {
464467
);
465468
public static final Setting.AffixSetting<List<String>> INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING = Setting.prefixKeySetting(
466469
"index.routing.allocation.initial_recovery.",
467-
Setting::stringListSetting
470+
key -> Setting.stringListSetting(key, Property.IndexScope, Property.DeprecatedWarning)
468471
);
472+
public static final Setting<List<String>> INDEX_ROUTING_INITIAL_RECOVERY_NODE_GROUP_SETTING = Setting.stringListSetting(INDEX_ROUTING_INITIAL_RECOVERY_GROUP, Property.IndexScope);
473+
public static final Setting<List<String>> INDEX_ROUTING_SHRINK_GROUP_SETTING = Setting.stringListSetting(INDEX_ROUTING_SHRINK_GROUP, Property.Dynamic, Property.IndexScope);
469474

470475
/**
471476
* The number of active shard copies to check for before proceeding with a write operation.
@@ -605,6 +610,8 @@ public Iterator<Setting<?>> settings() {
605610
private final DiscoveryNodeFilters includeFilters;
606611
private final DiscoveryNodeFilters excludeFilters;
607612
private final DiscoveryNodeFilters initialRecoveryFilters;
613+
private final NodeGroup initialRecoveryNodeGroup;
614+
private final NodeGroup shrinkGroup;
608615

609616
private final IndexVersion indexCreatedVersion;
610617
private final IndexVersion mappingsUpdatedVersion;
@@ -673,8 +680,10 @@ private IndexMetadata(
673680
final Map<Integer, Set<String>> inSyncAllocationIds,
674681
final DiscoveryNodeFilters requireFilters,
675682
final DiscoveryNodeFilters initialRecoveryFilters,
683+
final NodeGroup initialRecoveryNodeGroup,
676684
final DiscoveryNodeFilters includeFilters,
677685
final DiscoveryNodeFilters excludeFilters,
686+
final NodeGroup shrinkGroup,
678687
final IndexVersion indexCreatedVersion,
679688
final IndexVersion mappingsUpdatedVersion,
680689
final int routingNumShards,
@@ -729,6 +738,8 @@ private IndexMetadata(
729738
this.includeFilters = includeFilters;
730739
this.excludeFilters = excludeFilters;
731740
this.initialRecoveryFilters = initialRecoveryFilters;
741+
this.initialRecoveryNodeGroup = initialRecoveryNodeGroup;
742+
this.shrinkGroup = shrinkGroup;
732743
this.indexCreatedVersion = indexCreatedVersion;
733744
this.routingNumShards = routingNumShards;
734745
this.routingFactor = routingNumShards / numberOfShards;
@@ -785,8 +796,10 @@ IndexMetadata withMappingMetadata(MappingMetadata mapping) {
785796
this.inSyncAllocationIds,
786797
this.requireFilters,
787798
this.initialRecoveryFilters,
799+
this.initialRecoveryNodeGroup,
788800
this.includeFilters,
789801
this.excludeFilters,
802+
this.shrinkGroup,
790803
this.indexCreatedVersion,
791804
this.mappingsUpdatedVersion,
792805
this.routingNumShards,
@@ -846,8 +859,10 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set<String> inSyncSet)
846859
Maps.copyMapWithAddedOrReplacedEntry(this.inSyncAllocationIds, shardId, Set.copyOf(inSyncSet)),
847860
this.requireFilters,
848861
this.initialRecoveryFilters,
862+
this.initialRecoveryNodeGroup,
849863
this.includeFilters,
850864
this.excludeFilters,
865+
this.shrinkGroup,
851866
this.indexCreatedVersion,
852867
this.mappingsUpdatedVersion,
853868
this.routingNumShards,
@@ -905,8 +920,10 @@ public IndexMetadata withIncrementedPrimaryTerm(int shardId) {
905920
this.inSyncAllocationIds,
906921
this.requireFilters,
907922
this.initialRecoveryFilters,
923+
this.initialRecoveryNodeGroup,
908924
this.includeFilters,
909925
this.excludeFilters,
926+
this.shrinkGroup,
910927
this.indexCreatedVersion,
911928
this.mappingsUpdatedVersion,
912929
this.routingNumShards,
@@ -965,8 +982,10 @@ public IndexMetadata withTimestampRanges(IndexLongFieldRange timestampRange, Ind
965982
this.inSyncAllocationIds,
966983
this.requireFilters,
967984
this.initialRecoveryFilters,
985+
this.initialRecoveryNodeGroup,
968986
this.includeFilters,
969987
this.excludeFilters,
988+
this.shrinkGroup,
970989
this.indexCreatedVersion,
971990
this.mappingsUpdatedVersion,
972991
this.routingNumShards,
@@ -1020,8 +1039,10 @@ public IndexMetadata withIncrementedVersion() {
10201039
this.inSyncAllocationIds,
10211040
this.requireFilters,
10221041
this.initialRecoveryFilters,
1042+
this.initialRecoveryNodeGroup,
10231043
this.includeFilters,
10241044
this.excludeFilters,
1045+
this.shrinkGroup,
10251046
this.indexCreatedVersion,
10261047
this.mappingsUpdatedVersion,
10271048
this.routingNumShards,
@@ -1278,6 +1299,10 @@ public Index getResizeSourceIndex() {
12781299
: null;
12791300
}
12801301

1302+
public NodeGroup getShrinkGroup() {
1303+
return shrinkGroup;
1304+
}
1305+
12811306
public static final String INDEX_DOWNSAMPLE_SOURCE_UUID_KEY = "index.downsample.source.uuid";
12821307
public static final String INDEX_DOWNSAMPLE_SOURCE_NAME_KEY = "index.downsample.source.name";
12831308
public static final String INDEX_DOWNSAMPLE_ORIGIN_NAME_KEY = "index.downsample.origin.name";
@@ -1368,6 +1393,12 @@ public DiscoveryNodeFilters getInitialRecoveryFilters() {
13681393
return initialRecoveryFilters;
13691394
}
13701395

1396+
@Nullable
1397+
public NodeGroup getInitialRecoveryNodeGroup() {
1398+
return initialRecoveryNodeGroup;
1399+
}
1400+
1401+
13711402
@Nullable
13721403
public DiscoveryNodeFilters includeFilters() {
13731404
return includeFilters;
@@ -1377,6 +1408,10 @@ public DiscoveryNodeFilters includeFilters() {
13771408
public DiscoveryNodeFilters excludeFilters() {
13781409
return excludeFilters;
13791410
}
1411+
@Nullable
1412+
public NodeGroup shrinkGroup() {
1413+
return shrinkGroup;
1414+
}
13801415

13811416
public IndexLongFieldRange getTimestampRange() {
13821417
return timestampRange;
@@ -2253,13 +2288,36 @@ IndexMetadata build(boolean repair) {
22532288
} else {
22542289
excludeFilters = DiscoveryNodeFilters.buildFromKeyValues(OR, excludeMap);
22552290
}
2291+
var shrinkGroupNodes = INDEX_ROUTING_SHRINK_GROUP_SETTING.get(settings);
2292+
final NodeGroup shrinkGroup;
2293+
if (shrinkGroupNodes.isEmpty()) {
2294+
shrinkGroup = null;
2295+
} else {
2296+
shrinkGroup = NodeGroup.buildFromNodeIDs(shrinkGroupNodes);
2297+
}
2298+
var initialRecoveryGroupNodes = INDEX_ROUTING_INITIAL_RECOVERY_NODE_GROUP_SETTING.get(settings);
2299+
final NodeGroup initialRecoveryGroup;
2300+
if (initialRecoveryGroupNodes.isEmpty()) {
2301+
initialRecoveryGroup = null;
2302+
} else {
2303+
initialRecoveryGroup = NodeGroup.buildFromNodeIDs(initialRecoveryGroupNodes);
2304+
}
22562305
var initialRecoveryMap = INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING.getAsMap(settings);
22572306
final DiscoveryNodeFilters initialRecoveryFilters;
22582307
if (initialRecoveryMap.isEmpty()) {
22592308
initialRecoveryFilters = null;
22602309
} else {
22612310
initialRecoveryFilters = DiscoveryNodeFilters.buildFromKeyValues(OR, initialRecoveryMap);
22622311
}
2312+
if (initialRecoveryGroup != null && initialRecoveryFilters != null) {
2313+
throw new IllegalStateException(
2314+
"cannot create an index with both ["
2315+
+ INDEX_ROUTING_INITIAL_RECOVERY_NODE_GROUP_SETTING.getKey()
2316+
+ "] and ["
2317+
+ INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING.getKey()
2318+
+ "] set"
2319+
);
2320+
}
22632321
IndexVersion indexCreatedVersion = indexCreatedVersion(settings);
22642322

22652323
if (primaryTerms == null) {
@@ -2361,8 +2419,10 @@ IndexMetadata build(boolean repair) {
23612419
Map.ofEntries(denseInSyncAllocationIds),
23622420
requireFilters,
23632421
initialRecoveryFilters,
2422+
initialRecoveryGroup,
23642423
includeFilters,
23652424
excludeFilters,
2425+
shrinkGroup,
23662426
indexCreatedVersion,
23672427
mappingsUpdatedVersion,
23682428
getRoutingNumShards(),
@@ -2991,18 +3051,19 @@ public static Set<ShardId> selectShrinkShards(int shardId, IndexMetadata sourceI
29913051
"the number of target shards (" + numTargetShards + ") must be greater than the shard id: " + shardId
29923052
);
29933053
}
2994-
if (sourceIndexMetadata.getNumberOfShards() < numTargetShards) {
3054+
int numSourceShards = sourceIndexMetadata.getNumberOfShards();
3055+
if (numSourceShards < numTargetShards) {
29953056
throw new IllegalArgumentException(
29963057
"the number of target shards ["
29973058
+ numTargetShards
29983059
+ "] must be less that the number of source shards ["
2999-
+ sourceIndexMetadata.getNumberOfShards()
3060+
+ numSourceShards
30003061
+ "]"
30013062
);
30023063
}
3003-
int routingFactor = getRoutingFactor(sourceIndexMetadata.getNumberOfShards(), numTargetShards);
3064+
int routingFactor = getRoutingFactor(numSourceShards, numTargetShards);
30043065
Set<ShardId> shards = Sets.newHashSetWithExpectedSize(routingFactor);
3005-
for (int i = shardId * routingFactor; i < routingFactor * shardId + routingFactor; i++) {
3066+
for (int i = shardId; i < numSourceShards; i += numTargetShards) {
30063067
shards.add(new ShardId(sourceIndexMetadata.getIndex(), i));
30073068
}
30083069
return shards;

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.cluster.block.ClusterBlocks;
3232
import org.elasticsearch.cluster.node.DiscoveryNode;
3333
import org.elasticsearch.cluster.node.DiscoveryNodes;
34+
import org.elasticsearch.cluster.node.NodeGroup;
3435
import org.elasticsearch.cluster.routing.GlobalRoutingTable;
3536
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3637
import org.elasticsearch.cluster.routing.RoutingTable;
@@ -110,6 +111,8 @@
110111
import static java.util.stream.Collectors.toList;
111112
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING;
112113
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
114+
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_INITIAL_RECOVERY_GROUP;
115+
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_SHRINK_GROUP;
113116
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_SHRINK_INITIAL_RECOVERY_KEY;
114117
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS;
115118
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE;
@@ -1574,6 +1577,42 @@ private static List<String> validateIndexCustomPath(Settings settings, @Nullable
15741577
return validationErrors;
15751578
}
15761579

1580+
static void validateShrinkGroup(
1581+
ProjectMetadata projectMetadata,
1582+
ClusterBlocks clusterBlocks,
1583+
RoutingTable routingTable,
1584+
String sourceIndex,
1585+
String targetIndexName,
1586+
Settings targetIndexSettings,
1587+
NodeGroup shrinkNodeGroup
1588+
) {
1589+
IndexMetadata sourceMetadata = validateResize(projectMetadata, clusterBlocks, sourceIndex, targetIndexName, targetIndexSettings);
1590+
if (sourceMetadata.isSearchableSnapshot()) {
1591+
throw new IllegalArgumentException("can't shrink searchable snapshot index [" + sourceIndex + ']');
1592+
}
1593+
assert INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings);
1594+
IndexMetadata.selectShrinkShards(0, sourceMetadata, INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings));
1595+
1596+
if (sourceMetadata.getNumberOfShards() == 1) {
1597+
throw new IllegalArgumentException("can't shrink an index with only one shard");
1598+
}
1599+
1600+
final IndexRoutingTable table = routingTable.index(sourceIndex);
1601+
for (int i = 0; i < sourceMetadata.getNumberOfShards(); i++) {
1602+
String groupNode = shrinkNodeGroup.nodeForShard(i);
1603+
if (!table.shard(i).activeShards().stream().anyMatch((routing) -> (routing.currentNodeId().equals(groupNode)))){
1604+
throw new IllegalArgumentException("source shard [" + i + "] is not allocated and active on node [" + groupNode + "]");
1605+
}
1606+
}
1607+
1608+
// check that all source shards are allocated correctly in the node group.
1609+
Map<String, AtomicInteger> nodesToNumRouting = new HashMap<>();
1610+
int numShards = sourceMetadata.getNumberOfShards();
1611+
for (ShardRouting routing : table.shardsWithState(ShardRoutingState.STARTED)) {
1612+
nodesToNumRouting.computeIfAbsent(routing.currentNodeId(), (s) -> new AtomicInteger(0)).incrementAndGet();
1613+
}
1614+
}
1615+
15771616
/**
15781617
* Validates the settings and mappings for shrinking an index.
15791618
*
@@ -1725,21 +1764,38 @@ static void prepareResizeIndexSettings(
17251764
) {
17261765
final IndexMetadata sourceMetadata = projectMetadata.index(resizeSourceIndex.getName());
17271766
if (type == ResizeType.SHRINK) {
1728-
final List<String> nodesToAllocateOn = validateShrinkIndex(
1729-
projectMetadata,
1730-
clusterBlocks,
1731-
routingTable,
1732-
resizeSourceIndex.getName(),
1733-
resizeIntoName,
1734-
indexSettingsBuilder.build()
1735-
);
1736-
indexSettingsBuilder.put(INDEX_SHRINK_INITIAL_RECOVERY_KEY, Strings.arrayToCommaDelimitedString(nodesToAllocateOn.toArray()));
1767+
if (sourceMetadata.getShrinkGroup() != null) {
1768+
validateShrinkGroup(
1769+
projectMetadata,
1770+
clusterBlocks,
1771+
routingTable,
1772+
resizeSourceIndex.getName(),
1773+
resizeIntoName,
1774+
indexSettingsBuilder.build(),
1775+
sourceMetadata.getShrinkGroup()
1776+
);
1777+
indexSettingsBuilder.putNull(INDEX_SHRINK_INITIAL_RECOVERY_KEY);
1778+
indexSettingsBuilder.putList(INDEX_ROUTING_INITIAL_RECOVERY_GROUP, sourceMetadata.getShrinkGroup().toList());
1779+
} else {
1780+
final List<String> nodesToAllocateOn = validateShrinkIndex(
1781+
projectMetadata,
1782+
clusterBlocks,
1783+
routingTable,
1784+
resizeSourceIndex.getName(),
1785+
resizeIntoName,
1786+
indexSettingsBuilder.build()
1787+
);
1788+
indexSettingsBuilder.put(INDEX_SHRINK_INITIAL_RECOVERY_KEY, Strings.arrayToCommaDelimitedString(nodesToAllocateOn.toArray()));
1789+
indexSettingsBuilder.putNull(INDEX_ROUTING_INITIAL_RECOVERY_GROUP);
1790+
}
17371791
} else if (type == ResizeType.SPLIT) {
17381792
validateSplitIndex(projectMetadata, clusterBlocks, resizeSourceIndex.getName(), resizeIntoName, indexSettingsBuilder.build());
17391793
indexSettingsBuilder.putNull(INDEX_SHRINK_INITIAL_RECOVERY_KEY);
1794+
indexSettingsBuilder.putNull(INDEX_ROUTING_INITIAL_RECOVERY_GROUP);
17401795
} else if (type == ResizeType.CLONE) {
17411796
validateCloneIndex(projectMetadata, clusterBlocks, resizeSourceIndex.getName(), resizeIntoName, indexSettingsBuilder.build());
17421797
indexSettingsBuilder.putNull(INDEX_SHRINK_INITIAL_RECOVERY_KEY);
1798+
indexSettingsBuilder.putNull(INDEX_ROUTING_INITIAL_RECOVERY_GROUP);
17431799
} else {
17441800
throw new IllegalStateException("unknown resize type is " + type);
17451801
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.elasticsearch.cluster.node;
2+
3+
import java.util.List;
4+
5+
public class NodeGroup {
6+
private final List<String> nodes;
7+
8+
public static NodeGroup buildFromNodeIDs(List<String> nodes) {
9+
return new NodeGroup(nodes);
10+
}
11+
12+
private NodeGroup(List<String> nodes) {
13+
this.nodes = nodes;
14+
}
15+
16+
public List<String> toList() {
17+
return this.nodes;
18+
}
19+
20+
public String nodeForShard(int shard) {
21+
if (this.nodes.size() == 0) {
22+
return null;
23+
}
24+
return this.nodes.get(shard % this.nodes.size());
25+
}
26+
27+
public boolean matchShard(String nodeId, int shard) {
28+
String node = nodeForShard(shard);
29+
if (node == null) {
30+
return false;
31+
}
32+
return node.equals(nodeId);
33+
}
34+
35+
public boolean containsNode(String nodeId) {
36+
return this.nodes.contains(nodeId);
37+
}
38+
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/ResizeSourceIndexSettingsUpdater.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public Metadata applyChanges(Metadata metadata, GlobalRoutingTable routingTable)
5151

5252
Settings.Builder builder = Settings.builder().put(indexMetadata.getSettings());
5353
builder.remove(IndexMetadata.INDEX_SHRINK_INITIAL_RECOVERY_KEY);
54+
builder.remove(IndexMetadata.INDEX_ROUTING_INITIAL_RECOVERY_GROUP);
5455
builder.remove(IndexMetadata.INDEX_RESIZE_SOURCE_UUID_KEY);
5556
if (Strings.isNullOrEmpty(indexMetadata.getLifecyclePolicyName())) {
5657
// Required by ILM after an index has been shrunk

0 commit comments

Comments
 (0)