Skip to content

Commit 1f86bd8

Browse files
committed
Add Virtual Shards routing and mapping overrides
Implements the initial routing foundation for Virtual Shards. Adds settings validation for virtual shards and routing overrides via IndexMetadata custom data.
1 parent 42abf94 commit 1f86bd8

File tree

6 files changed

+278
-0
lines changed

6 files changed

+278
-0
lines changed

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,18 @@ static Setting<Integer> buildNumberOfShardsSetting() {
264264

265265
public static final String INDEX_SETTING_PREFIX = "index.";
266266
public static final String SETTING_NUMBER_OF_SHARDS = "index.number_of_shards";
267+
public static final String SETTING_NUMBER_OF_VIRTUAL_SHARDS = "index.number_of_virtual_shards";
267268
static final String DEFAULT_NUMBER_OF_SHARDS = "opensearch.index.default_number_of_shards";
268269
static final String MAX_NUMBER_OF_SHARDS = "opensearch.index.max_number_of_shards";
269270
public static final Setting<Integer> INDEX_NUMBER_OF_SHARDS_SETTING = buildNumberOfShardsSetting();
271+
public static final Setting<Integer> INDEX_NUMBER_OF_VIRTUAL_SHARDS_SETTING = Setting.intSetting(
272+
SETTING_NUMBER_OF_VIRTUAL_SHARDS,
273+
-1,
274+
-1,
275+
Property.IndexScope,
276+
Property.Final
277+
);
278+
270279
public static final String SETTING_NUMBER_OF_REPLICAS = "index.number_of_replicas";
271280
public static final Setting<Integer> INDEX_NUMBER_OF_REPLICAS_SETTING = Setting.intSetting(
272281
SETTING_NUMBER_OF_REPLICAS,
@@ -1288,6 +1297,16 @@ public int getNumberOfShards() {
12881297
return numberOfShards;
12891298
}
12901299

1300+
/**
1301+
* Returns the number of virtual shards for this index.
1302+
* Returns -1 if virtual shards are disabled.
1303+
*
1304+
* @return the number of virtual shards or -1
1305+
*/
1306+
public int getNumberOfVirtualShards() {
1307+
return settings.getAsInt(SETTING_NUMBER_OF_VIRTUAL_SHARDS, -1);
1308+
}
1309+
12911310
public int getNumberOfReplicas() {
12921311
return numberOfReplicas;
12931312
}
@@ -2150,6 +2169,19 @@ public IndexMetadata build() {
21502169
);
21512170
}
21522171

2172+
final int numberOfVirtualShards = INDEX_NUMBER_OF_VIRTUAL_SHARDS_SETTING.get(settings);
2173+
if (numberOfVirtualShards != -1 && numberOfVirtualShards < numberOfShards) {
2174+
throw new IllegalArgumentException(
2175+
"number of virtual shards ["
2176+
+ numberOfVirtualShards
2177+
+ "] must be >= number of shards ["
2178+
+ numberOfShards
2179+
+ "] for ["
2180+
+ index
2181+
+ "]"
2182+
);
2183+
}
2184+
21532185
// fill missing slots in inSyncAllocationIds with empty set if needed and make all entries immutable
21542186
final Map<Integer, Set<String>> filledInSyncAllocationIds = new HashMap<>();
21552187
for (int i = 0; i < numberOfShards; i++) {
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.metadata;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.annotation.PublicApi;
14+
15+
import java.util.Map;
16+
17+
/**
18+
* Resolves virtual shard routing to physical shard IDs.
19+
*
20+
* @opensearch.api
21+
*/
22+
@PublicApi(since = "3.6.0")
23+
public final class VirtualShardRoutingHelper {
24+
25+
private VirtualShardRoutingHelper() {}
26+
27+
private static final Logger logger = LogManager.getLogger(VirtualShardRoutingHelper.class);
28+
29+
/**
30+
* Custom Metadata key for storing virtual shard routing overrides.
31+
*/
32+
public static final String VIRTUAL_SHARDS_CUSTOM_METADATA_KEY = "virtual_shards_routing";
33+
34+
/**
35+
* Resolves the Physical Shard ID for a given Virtual Shard ID.
36+
*
37+
* @param indexMetadata The index metadata.
38+
* @param vShardId The virtual shard ID.
39+
* @return The physical shard ID.
40+
*/
41+
public static int resolvePhysicalShardId(IndexMetadata indexMetadata, int vShardId) {
42+
Map<String, String> overrides = indexMetadata.getCustomData(VIRTUAL_SHARDS_CUSTOM_METADATA_KEY);
43+
if (overrides != null) {
44+
String pShardIdStr = overrides.get(String.valueOf(vShardId));
45+
if (pShardIdStr != null) {
46+
try {
47+
int pShardId = Integer.parseInt(pShardIdStr);
48+
if (pShardId >= 0 && pShardId < indexMetadata.getNumberOfShards()) {
49+
return pShardId;
50+
}
51+
logger.trace("Invalid override value [{}] for vShard [{}]: out of bounds", pShardId, vShardId);
52+
} catch (NumberFormatException e) {
53+
logger.trace("Invalid override value [{}] for vShard [{}]: not a number", pShardIdStr, vShardId);
54+
}
55+
}
56+
}
57+
58+
return Math.floorMod(vShardId, indexMetadata.getNumberOfShards());
59+
}
60+
}

server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.lucene.util.CollectionUtil;
3636
import org.opensearch.cluster.ClusterState;
3737
import org.opensearch.cluster.metadata.IndexMetadata;
38+
import org.opensearch.cluster.metadata.VirtualShardRoutingHelper;
3839
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
3940
import org.opensearch.cluster.node.DiscoveryNodes;
4041
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
@@ -521,6 +522,13 @@ public static int generateShardId(IndexMetadata indexMetadata, @Nullable String
521522
partitionOffset = 0;
522523
}
523524

525+
int numVirtualShards = indexMetadata.getNumberOfVirtualShards();
526+
if (numVirtualShards != -1) {
527+
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
528+
int vShardId = Math.floorMod(hash, numVirtualShards);
529+
return VirtualShardRoutingHelper.resolvePhysicalShardId(indexMetadata, vShardId);
530+
}
531+
524532
return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);
525533
}
526534

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
105105
IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING,
106106
IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING,
107107
IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING,
108+
IndexMetadata.INDEX_NUMBER_OF_VIRTUAL_SHARDS_SETTING,
108109
IndexMetadata.INDEX_READ_ONLY_SETTING,
109110
IndexMetadata.INDEX_BLOCKS_READ_SETTING,
110111
IndexMetadata.INDEX_BLOCKS_WRITE_SETTING,
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.metadata;
10+
11+
import org.opensearch.Version;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.test.OpenSearchTestCase;
14+
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
18+
public class VirtualShardRoutingHelperTests extends OpenSearchTestCase {
19+
20+
public void testResolvePhysicalShardIdDefaultModulo() {
21+
int numPhysicalShards = 5;
22+
IndexMetadata metadata = IndexMetadata.builder("test")
23+
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
24+
.numberOfShards(numPhysicalShards)
25+
.numberOfReplicas(1)
26+
.build();
27+
28+
assertEquals(0, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 0));
29+
assertEquals(2, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 7));
30+
assertEquals(4, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 4));
31+
assertEquals(0, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 5));
32+
}
33+
34+
public void testResolvePhysicalShardIdWithOverrides() {
35+
int numPhysicalShards = 5;
36+
Map<String, String> overrides = new HashMap<>();
37+
overrides.put("7", "1");
38+
39+
IndexMetadata.Builder builder = IndexMetadata.builder("test")
40+
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
41+
.numberOfShards(numPhysicalShards)
42+
.numberOfReplicas(1);
43+
builder.putCustom(VirtualShardRoutingHelper.VIRTUAL_SHARDS_CUSTOM_METADATA_KEY, overrides);
44+
45+
IndexMetadata metadata = builder.build();
46+
47+
assertEquals(1, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 7));
48+
49+
assertEquals(0, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 0));
50+
}
51+
52+
public void testInvalidOverridesFallBackToModulo() {
53+
int numPhysicalShards = 5;
54+
Map<String, String> overrides = new HashMap<>();
55+
overrides.put("7", "not_a_number");
56+
overrides.put("8", "-1");
57+
overrides.put("9", "5");
58+
59+
IndexMetadata.Builder builder = IndexMetadata.builder("test")
60+
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
61+
.numberOfShards(numPhysicalShards)
62+
.numberOfReplicas(1);
63+
builder.putCustom(VirtualShardRoutingHelper.VIRTUAL_SHARDS_CUSTOM_METADATA_KEY, overrides);
64+
65+
IndexMetadata metadata = builder.build();
66+
67+
assertEquals(2, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 7));
68+
assertEquals(3, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 8));
69+
assertEquals(4, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 9));
70+
}
71+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.routing;
10+
11+
import org.opensearch.Version;
12+
import org.opensearch.cluster.metadata.IndexMetadata;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.test.OpenSearchTestCase;
15+
16+
public class VirtualShardOperationRoutingTests extends OpenSearchTestCase {
17+
18+
public void testGenerateVirtualShardId() {
19+
int numPhysicalShards = 4;
20+
int numVirtualShards = 16;
21+
22+
IndexMetadata metadata = IndexMetadata.builder("test")
23+
.settings(
24+
Settings.builder()
25+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
26+
.put(IndexMetadata.SETTING_NUMBER_OF_VIRTUAL_SHARDS, numVirtualShards)
27+
)
28+
.numberOfShards(numPhysicalShards)
29+
.numberOfReplicas(1)
30+
.build();
31+
32+
String routing = "user1";
33+
int routingHash = Murmur3HashFunction.hash(routing);
34+
int expectedVShard = Math.floorMod(routingHash, numVirtualShards);
35+
int expectedPShard = Math.floorMod(expectedVShard, numPhysicalShards);
36+
37+
assertEquals(expectedPShard, OperationRouting.generateShardId(metadata, "doc1", routing));
38+
}
39+
40+
public void testVirtualShardDisabledUsesLegacy() {
41+
int numPhysicalShards = 4;
42+
43+
IndexMetadata metadata = IndexMetadata.builder("test")
44+
.settings(
45+
Settings.builder()
46+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
47+
.put(IndexMetadata.SETTING_NUMBER_OF_VIRTUAL_SHARDS, -1)
48+
)
49+
.numberOfShards(numPhysicalShards)
50+
.numberOfReplicas(1)
51+
.build();
52+
53+
String routing = "user1";
54+
int legacyHash = Murmur3HashFunction.hash(routing);
55+
int expectedLegacyShard = Math.floorMod(legacyHash, numPhysicalShards);
56+
57+
assertEquals(expectedLegacyShard, OperationRouting.generateShardId(metadata, "doc1", routing));
58+
}
59+
60+
public void testVirtualShardValidation() {
61+
int numPhysicalShards = 10;
62+
int numVirtualShards = 5;
63+
64+
IllegalArgumentException e = expectThrows(
65+
IllegalArgumentException.class,
66+
() -> IndexMetadata.builder("test")
67+
.settings(
68+
Settings.builder()
69+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
70+
.put(IndexMetadata.SETTING_NUMBER_OF_VIRTUAL_SHARDS, numVirtualShards)
71+
)
72+
.numberOfShards(numPhysicalShards)
73+
.numberOfReplicas(1)
74+
.build()
75+
);
76+
77+
assertTrue(e.getMessage().contains("must be >= number of shards"));
78+
}
79+
80+
public void testVirtualShardWithRoutingPartition() {
81+
int numPhysicalShards = 4;
82+
int numVirtualShards = 16;
83+
int partitionSize = 2;
84+
85+
IndexMetadata metadata = IndexMetadata.builder("test")
86+
.settings(
87+
Settings.builder()
88+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
89+
.put(IndexMetadata.SETTING_NUMBER_OF_VIRTUAL_SHARDS, numVirtualShards)
90+
)
91+
.routingPartitionSize(partitionSize)
92+
.numberOfShards(numPhysicalShards)
93+
.numberOfReplicas(1)
94+
.build();
95+
96+
String id = "doc1";
97+
String routing = "user1";
98+
99+
int partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), partitionSize);
100+
int routingHash = Murmur3HashFunction.hash(routing) + partitionOffset;
101+
int expectedVShard = Math.floorMod(routingHash, numVirtualShards);
102+
int expectedPShard = Math.floorMod(expectedVShard, numPhysicalShards);
103+
104+
assertEquals(expectedPShard, OperationRouting.generateShardId(metadata, id, routing));
105+
}
106+
}

0 commit comments

Comments
 (0)