Skip to content

Commit a756e88

Browse files
authored
Add Initial Routing and Metadata Groundwork for Virtual Shards (opensearch-project#20745)
* Add Virtual Shards routing and mapping overrides Implements the initial routing foundation for Virtual Shards. Adds settings validation for virtual shards and routing overrides via IndexMetadata custom data. Signed-off-by: Atri Sharma <atri.jiit@gmail.com> * Add Virtual Shards routing and mapping overrides This PR adds the initial routing and metadata groundwork for virtual shards. When enabled, routing now uses a virtual shard id (vShardId) before resolving to a physical shard. This separates hash-space size from physical shard count and prepares the path for future shard movement workflows. What’s included - New static index setting: index.number_of_virtual_shards (default -1, disabled). - Routing update in OperationRouting.generateShardId: - virtual-shard path when enabled, - existing behavior unchanged when disabled. - New VirtualShardRoutingHelper for vShardId -> physical shard resolution. - Optional per-index override support via virtual_shards_routing custom metadata. - Test coverage for: - enabled/disabled routing behavior, - validation rules, - override and fallback behavior. Out of scope - Side-car segment extraction flow. - Transport/state-orchestration for managing override lifecycle. Signed-off-by: Atri Sharma <atri.jiit@gmail.com> * Add Virtual Shards routing and mapping overrides This PR adds the initial routing and metadata groundwork for virtual shards. When enabled, routing now uses a virtual shard id (vShardId) before resolving to a physical shard. This separates hash-space size from physical shard count and prepares the path for future shard movement workflows. What’s included - New static index setting: index.number_of_virtual_shards (default -1, disabled). - Routing update in OperationRouting.generateShardId: - virtual-shard path when enabled, - existing behavior unchanged when disabled. - New VirtualShardRoutingHelper for vShardId -> physical shard resolution. - Optional per-index override support via virtual_shards_routing custom metadata. - Test coverage for: - enabled/disabled routing behavior, - validation rules, - override and fallback behavior. Out of scope - Side-car segment extraction flow. - Transport/state-orchestration for managing override lifecycle. Signed-off-by: Atri Sharma <atri.jiit@gmail.com> * Refactor Virtual Shards to use range-based routing Signed-off-by: Atri Sharma <atri.jiit@gmail.com> * Add fail-safe to VirtualShardRoutingHelper and valid configuration tracking test Signed-off-by: Atri Sharma <atri.jiit@gmail.com> * Fixed review comments and move to range based sharding Signed-off-by: Atri Sharma <atri.jiit@gmail.com> --------- Signed-off-by: Atri Sharma <atri.jiit@gmail.com>
1 parent e52e135 commit a756e88

File tree

7 files changed

+384
-0
lines changed

7 files changed

+384
-0
lines changed

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,26 @@ static Setting<Integer> buildNumberOfShardsSetting() {
264264

265265
public static final String INDEX_SETTING_PREFIX = "index.";
266266
public static final String SETTING_NUMBER_OF_SHARDS = "index.number_of_shards";
267+
public static final String SETTING_NUMBER_OF_VIRTUAL_SHARDS = "index.number_of_virtual_shards";
267268
static final String DEFAULT_NUMBER_OF_SHARDS = "opensearch.index.default_number_of_shards";
268269
static final String MAX_NUMBER_OF_SHARDS = "opensearch.index.max_number_of_shards";
269270
public static final Setting<Integer> INDEX_NUMBER_OF_SHARDS_SETTING = buildNumberOfShardsSetting();
271+
/**
272+
* Settings for configuring the number of virtual shards on an index.
273+
* <p>
274+
* Note: Virtual Shards uses range-based mapping (e.g., {@code vShardId / (V / P)}) to route to
275+
* physical shards instead of default modulo hashing. This ensures contiguous bucket mapping
276+
* and safe physical shard merging operations later in the lifecycle. The number of virtual shards
277+
* must be a multiple of the number of physical shards {@code (V % P == 0)}.
278+
*/
279+
public static final Setting<Integer> INDEX_NUMBER_OF_VIRTUAL_SHARDS_SETTING = Setting.intSetting(
280+
SETTING_NUMBER_OF_VIRTUAL_SHARDS,
281+
-1,
282+
-1,
283+
Property.IndexScope,
284+
Property.Final
285+
);
286+
270287
public static final String SETTING_NUMBER_OF_REPLICAS = "index.number_of_replicas";
271288
public static final Setting<Integer> INDEX_NUMBER_OF_REPLICAS_SETTING = Setting.intSetting(
272289
SETTING_NUMBER_OF_REPLICAS,
@@ -1324,6 +1341,16 @@ public int getNumberOfShards() {
13241341
return numberOfShards;
13251342
}
13261343

1344+
/**
1345+
* Returns the number of virtual shards for this index.
1346+
* Returns -1 if virtual shards are disabled.
1347+
*
1348+
* @return the number of virtual shards or -1
1349+
*/
1350+
public int getNumberOfVirtualShards() {
1351+
return settings.getAsInt(SETTING_NUMBER_OF_VIRTUAL_SHARDS, -1);
1352+
}
1353+
13271354
public int getNumberOfReplicas() {
13281355
return numberOfReplicas;
13291356
}
@@ -2190,6 +2217,32 @@ public IndexMetadata build() {
21902217
);
21912218
}
21922219

2220+
final int numberOfVirtualShards = INDEX_NUMBER_OF_VIRTUAL_SHARDS_SETTING.get(settings);
2221+
if (numberOfVirtualShards != -1) {
2222+
if (numberOfVirtualShards < numberOfShards) {
2223+
throw new IllegalArgumentException(
2224+
"number of virtual shards ["
2225+
+ numberOfVirtualShards
2226+
+ "] must be >= number of shards ["
2227+
+ numberOfShards
2228+
+ "] for ["
2229+
+ index
2230+
+ "]"
2231+
);
2232+
}
2233+
if (numberOfVirtualShards % numberOfShards != 0) {
2234+
throw new IllegalArgumentException(
2235+
"number of virtual shards ["
2236+
+ numberOfVirtualShards
2237+
+ "] must be a multiple of number of shards ["
2238+
+ numberOfShards
2239+
+ "] for ["
2240+
+ index
2241+
+ "]"
2242+
);
2243+
}
2244+
}
2245+
21932246
// fill missing slots in inSyncAllocationIds with empty set if needed and make all entries immutable
21942247
final Map<Integer, Set<String>> filledInSyncAllocationIds = new HashMap<>();
21952248
for (int i = 0; i < numberOfShards; i++) {
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.metadata;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.annotation.PublicApi;
14+
15+
import java.util.Map;
16+
17+
/**
18+
* Resolves virtual shard routing to physical shard IDs.
19+
*/
20+
@PublicApi(since = "3.6.0")
21+
public final class VirtualShardRoutingHelper {
22+
23+
private VirtualShardRoutingHelper() {}
24+
25+
private static final Logger logger = LogManager.getLogger(VirtualShardRoutingHelper.class);
26+
27+
/**
28+
* Custom Metadata key for storing virtual shard routing overrides.
29+
*/
30+
public static final String VIRTUAL_SHARDS_CUSTOM_METADATA_KEY = "virtual_shards_routing";
31+
32+
/**
33+
* Resolves the physical shard for a virtual shard id.
34+
*/
35+
public static int resolvePhysicalShardId(IndexMetadata indexMetadata, int vShardId) {
36+
int numVirtualShards = indexMetadata.getNumberOfVirtualShards();
37+
int numPhysicalShards = indexMetadata.getNumberOfShards();
38+
39+
if (numVirtualShards < numPhysicalShards || numVirtualShards % numPhysicalShards != 0) {
40+
throw new IllegalArgumentException(
41+
"Virtual shards must be enabled and be a multiple of the number of physical shards to resolve routing."
42+
);
43+
}
44+
45+
vShardId = Math.floorMod(vShardId, numVirtualShards);
46+
47+
Map<String, String> overrides = indexMetadata.getCustomData(VIRTUAL_SHARDS_CUSTOM_METADATA_KEY);
48+
if (overrides != null) {
49+
String pShardIdStr = overrides.get(String.valueOf(vShardId));
50+
if (pShardIdStr != null) {
51+
try {
52+
int pShardId = Integer.parseInt(pShardIdStr);
53+
if (pShardId >= 0 && pShardId < numPhysicalShards) {
54+
return pShardId;
55+
}
56+
logger.trace("Invalid override value [{}] for vShard [{}]: out of bounds", pShardId, vShardId);
57+
} catch (NumberFormatException e) {
58+
logger.trace("Invalid override value [{}] for vShard [{}]: not a number", pShardIdStr, vShardId);
59+
}
60+
}
61+
}
62+
63+
int virtualShardsPerPhysical = numVirtualShards / numPhysicalShards;
64+
return vShardId / virtualShardsPerPhysical;
65+
}
66+
}

server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.lucene.util.CollectionUtil;
3636
import org.opensearch.cluster.ClusterState;
3737
import org.opensearch.cluster.metadata.IndexMetadata;
38+
import org.opensearch.cluster.metadata.VirtualShardRoutingHelper;
3839
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
3940
import org.opensearch.cluster.node.DiscoveryNodes;
4041
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
@@ -521,6 +522,13 @@ public static int generateShardId(IndexMetadata indexMetadata, @Nullable String
521522
partitionOffset = 0;
522523
}
523524

525+
int numVirtualShards = indexMetadata.getNumberOfVirtualShards();
526+
if (numVirtualShards != -1) {
527+
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
528+
int vShardId = Math.floorMod(hash, numVirtualShards);
529+
return VirtualShardRoutingHelper.resolvePhysicalShardId(indexMetadata, vShardId);
530+
}
531+
524532
return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);
525533
}
526534

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
105105
IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING,
106106
IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING,
107107
IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING,
108+
IndexMetadata.INDEX_NUMBER_OF_VIRTUAL_SHARDS_SETTING,
108109
IndexMetadata.INDEX_READ_ONLY_SETTING,
109110
IndexMetadata.INDEX_BLOCKS_READ_SETTING,
110111
IndexMetadata.INDEX_BLOCKS_WRITE_SETTING,
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.metadata;
10+
11+
import org.opensearch.Version;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.test.OpenSearchTestCase;
14+
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
18+
public class VirtualShardRoutingHelperTests extends OpenSearchTestCase {
19+
20+
public void testResolvePhysicalShardIdDefaultRangeBased() {
21+
int numPhysicalShards = 5;
22+
IndexMetadata metadata = IndexMetadata.builder("test")
23+
.settings(
24+
Settings.builder()
25+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
26+
.put(IndexMetadata.SETTING_NUMBER_OF_VIRTUAL_SHARDS, 20)
27+
)
28+
.numberOfShards(numPhysicalShards)
29+
.numberOfReplicas(1)
30+
.build();
31+
32+
assertEquals(0, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 0));
33+
assertEquals(0, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 3));
34+
assertEquals(1, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 4));
35+
assertEquals(1, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 7));
36+
assertEquals(4, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 19));
37+
}
38+
39+
public void testResolvePhysicalShardIdWithOverrides() {
40+
int numPhysicalShards = 5;
41+
Map<String, String> overrides = new HashMap<>();
42+
overrides.put("7", "1");
43+
overrides.put("8", "2");
44+
45+
IndexMetadata.Builder builder = IndexMetadata.builder("test")
46+
.settings(
47+
Settings.builder()
48+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
49+
.put(IndexMetadata.SETTING_NUMBER_OF_VIRTUAL_SHARDS, 20)
50+
)
51+
.numberOfShards(numPhysicalShards)
52+
.numberOfReplicas(1);
53+
builder.putCustom(VirtualShardRoutingHelper.VIRTUAL_SHARDS_CUSTOM_METADATA_KEY, overrides);
54+
55+
IndexMetadata metadata = builder.build();
56+
57+
assertEquals(1, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 7));
58+
assertEquals(2, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 8));
59+
60+
assertEquals(0, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 0));
61+
assertEquals(2, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 9));
62+
}
63+
64+
public void testInvalidOverridesFallBackToRangeBased() {
65+
int numPhysicalShards = 5;
66+
Map<String, String> overrides = new HashMap<>();
67+
overrides.put("7", "not_a_number");
68+
overrides.put("8", "-1");
69+
overrides.put("19", "5");
70+
71+
IndexMetadata.Builder builder = IndexMetadata.builder("test")
72+
.settings(
73+
Settings.builder()
74+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
75+
.put(IndexMetadata.SETTING_NUMBER_OF_VIRTUAL_SHARDS, 20)
76+
)
77+
.numberOfShards(numPhysicalShards)
78+
.numberOfReplicas(1);
79+
builder.putCustom(VirtualShardRoutingHelper.VIRTUAL_SHARDS_CUSTOM_METADATA_KEY, overrides);
80+
81+
IndexMetadata metadata = builder.build();
82+
83+
assertEquals(1, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 7));
84+
assertEquals(2, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 8));
85+
assertEquals(4, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 19));
86+
}
87+
88+
public void testResolvePhysicalShardIdInvalidConfigurations() {
89+
int numPhysicalShards = 5;
90+
91+
IndexMetadata metadataDisabled = org.mockito.Mockito.mock(IndexMetadata.class);
92+
org.mockito.Mockito.when(metadataDisabled.getNumberOfVirtualShards()).thenReturn(-1);
93+
org.mockito.Mockito.when(metadataDisabled.getNumberOfShards()).thenReturn(numPhysicalShards);
94+
95+
IllegalArgumentException e1 = expectThrows(
96+
IllegalArgumentException.class,
97+
() -> VirtualShardRoutingHelper.resolvePhysicalShardId(metadataDisabled, 0)
98+
);
99+
assertTrue(e1.getMessage().contains("must be enabled and be a multiple"));
100+
101+
IndexMetadata metadataInvalid = org.mockito.Mockito.mock(IndexMetadata.class);
102+
org.mockito.Mockito.when(metadataInvalid.getNumberOfVirtualShards()).thenReturn(13);
103+
org.mockito.Mockito.when(metadataInvalid.getNumberOfShards()).thenReturn(numPhysicalShards);
104+
105+
IllegalArgumentException e2 = expectThrows(
106+
IllegalArgumentException.class,
107+
() -> VirtualShardRoutingHelper.resolvePhysicalShardId(metadataInvalid, 0)
108+
);
109+
assertTrue(e2.getMessage().contains("must be enabled and be a multiple"));
110+
}
111+
112+
public void testResolvePhysicalShardIdOutOfBoundsNormalization() {
113+
int numPhysicalShards = 5;
114+
IndexMetadata metadata = org.mockito.Mockito.mock(IndexMetadata.class);
115+
org.mockito.Mockito.when(metadata.getNumberOfVirtualShards()).thenReturn(20);
116+
org.mockito.Mockito.when(metadata.getNumberOfShards()).thenReturn(numPhysicalShards);
117+
118+
assertEquals(4, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, -1));
119+
120+
assertEquals(0, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 20));
121+
assertEquals(1, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 25));
122+
}
123+
}

0 commit comments

Comments
 (0)