Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,26 @@ static Setting<Integer> buildNumberOfShardsSetting() {

public static final String INDEX_SETTING_PREFIX = "index.";
public static final String SETTING_NUMBER_OF_SHARDS = "index.number_of_shards";
public static final String SETTING_NUMBER_OF_VIRTUAL_SHARDS = "index.number_of_virtual_shards";
static final String DEFAULT_NUMBER_OF_SHARDS = "opensearch.index.default_number_of_shards";
static final String MAX_NUMBER_OF_SHARDS = "opensearch.index.max_number_of_shards";
public static final Setting<Integer> INDEX_NUMBER_OF_SHARDS_SETTING = buildNumberOfShardsSetting();
/**
* Settings for configuring the number of virtual shards on an index.
* <p>
* Note: Virtual Shards uses range-based mapping (e.g., {@code vShardId / (V / P)}) to route to
* physical shards instead of default modulo hashing. This ensures contiguous bucket mapping
* and safe physical shard merging operations later in the lifecycle. The number of virtual shards
* must be a multiple of the number of physical shards {@code (V % P == 0)}.
*/
public static final Setting<Integer> INDEX_NUMBER_OF_VIRTUAL_SHARDS_SETTING = Setting.intSetting(
SETTING_NUMBER_OF_VIRTUAL_SHARDS,
-1,
-1,
Property.IndexScope,
Property.Final
);

public static final String SETTING_NUMBER_OF_REPLICAS = "index.number_of_replicas";
public static final Setting<Integer> INDEX_NUMBER_OF_REPLICAS_SETTING = Setting.intSetting(
SETTING_NUMBER_OF_REPLICAS,
Expand Down Expand Up @@ -1324,6 +1341,16 @@ public int getNumberOfShards() {
return numberOfShards;
}

/**
* Returns the number of virtual shards for this index.
* Returns -1 if virtual shards are disabled.
*
* @return the number of virtual shards or -1
*/
public int getNumberOfVirtualShards() {
return settings.getAsInt(SETTING_NUMBER_OF_VIRTUAL_SHARDS, -1);
}

public int getNumberOfReplicas() {
return numberOfReplicas;
}
Expand Down Expand Up @@ -2190,6 +2217,32 @@ public IndexMetadata build() {
);
}

final int numberOfVirtualShards = INDEX_NUMBER_OF_VIRTUAL_SHARDS_SETTING.get(settings);
if (numberOfVirtualShards != -1) {
if (numberOfVirtualShards < numberOfShards) {
throw new IllegalArgumentException(
"number of virtual shards ["
+ numberOfVirtualShards
+ "] must be >= number of shards ["
+ numberOfShards
+ "] for ["
+ index
+ "]"
);
}
if (numberOfVirtualShards % numberOfShards != 0) {
throw new IllegalArgumentException(
"number of virtual shards ["
+ numberOfVirtualShards
+ "] must be a multiple of number of shards ["
+ numberOfShards
+ "] for ["
+ index
+ "]"
);
}
}

// fill missing slots in inSyncAllocationIds with empty set if needed and make all entries immutable
final Map<Integer, Set<String>> filledInSyncAllocationIds = new HashMap<>();
for (int i = 0; i < numberOfShards; i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.metadata;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.annotation.PublicApi;

import java.util.Map;

/**
* Resolves virtual shard routing to physical shard IDs.
*/
@PublicApi(since = "3.6.0")
public final class VirtualShardRoutingHelper {

private VirtualShardRoutingHelper() {}

private static final Logger logger = LogManager.getLogger(VirtualShardRoutingHelper.class);

/**
* Custom Metadata key for storing virtual shard routing overrides.
*/
public static final String VIRTUAL_SHARDS_CUSTOM_METADATA_KEY = "virtual_shards_routing";

/**
* Resolves the physical shard for a virtual shard id.
*/
public static int resolvePhysicalShardId(IndexMetadata indexMetadata, int vShardId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would be the access pattern, would it be called during the context of every request or cached?

int numVirtualShards = indexMetadata.getNumberOfVirtualShards();
int numPhysicalShards = indexMetadata.getNumberOfShards();

if (numVirtualShards < numPhysicalShards || numVirtualShards % numPhysicalShards != 0) {
throw new IllegalArgumentException(
"Virtual shards must be enabled and be a multiple of the number of physical shards to resolve routing."
);
}

vShardId = Math.floorMod(vShardId, numVirtualShards);

Map<String, String> overrides = indexMetadata.getCustomData(VIRTUAL_SHARDS_CUSTOM_METADATA_KEY);
if (overrides != null) {
String pShardIdStr = overrides.get(String.valueOf(vShardId));
if (pShardIdStr != null) {
try {
int pShardId = Integer.parseInt(pShardIdStr);
if (pShardId >= 0 && pShardId < numPhysicalShards) {
return pShardId;
}
logger.trace("Invalid override value [{}] for vShard [{}]: out of bounds", pShardId, vShardId);
} catch (NumberFormatException e) {
logger.trace("Invalid override value [{}] for vShard [{}]: not a number", pShardIdStr, vShardId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we swallowing the exception?

}
}
}

int virtualShardsPerPhysical = numVirtualShards / numPhysicalShards;
return vShardId / virtualShardsPerPhysical;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.VirtualShardRoutingHelper;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
Expand Down Expand Up @@ -521,6 +522,13 @@ public static int generateShardId(IndexMetadata indexMetadata, @Nullable String
partitionOffset = 0;
}

int numVirtualShards = indexMetadata.getNumberOfVirtualShards();
if (numVirtualShards != -1) {
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
int vShardId = Math.floorMod(hash, numVirtualShards);
return VirtualShardRoutingHelper.resolvePhysicalShardId(indexMetadata, vShardId);
}

return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING,
IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING,
IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING,
IndexMetadata.INDEX_NUMBER_OF_VIRTUAL_SHARDS_SETTING,
IndexMetadata.INDEX_READ_ONLY_SETTING,
IndexMetadata.INDEX_BLOCKS_READ_SETTING,
IndexMetadata.INDEX_BLOCKS_WRITE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.metadata;

import org.opensearch.Version;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;

import java.util.HashMap;
import java.util.Map;

public class VirtualShardRoutingHelperTests extends OpenSearchTestCase {

public void testResolvePhysicalShardIdDefaultRangeBased() {
int numPhysicalShards = 5;
IndexMetadata metadata = IndexMetadata.builder("test")
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_VIRTUAL_SHARDS, 20)
)
.numberOfShards(numPhysicalShards)
.numberOfReplicas(1)
.build();

assertEquals(0, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 0));
assertEquals(0, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 3));
assertEquals(1, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 4));
assertEquals(1, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 7));
assertEquals(4, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 19));
}

public void testResolvePhysicalShardIdWithOverrides() {
int numPhysicalShards = 5;
Map<String, String> overrides = new HashMap<>();
overrides.put("7", "1");
overrides.put("8", "2");

IndexMetadata.Builder builder = IndexMetadata.builder("test")
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_VIRTUAL_SHARDS, 20)
)
.numberOfShards(numPhysicalShards)
.numberOfReplicas(1);
builder.putCustom(VirtualShardRoutingHelper.VIRTUAL_SHARDS_CUSTOM_METADATA_KEY, overrides);

IndexMetadata metadata = builder.build();

assertEquals(1, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 7));
assertEquals(2, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 8));

assertEquals(0, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 0));
assertEquals(2, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 9));
}

public void testInvalidOverridesFallBackToRangeBased() {
int numPhysicalShards = 5;
Map<String, String> overrides = new HashMap<>();
overrides.put("7", "not_a_number");
overrides.put("8", "-1");
overrides.put("19", "5");

IndexMetadata.Builder builder = IndexMetadata.builder("test")
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_VIRTUAL_SHARDS, 20)
)
.numberOfShards(numPhysicalShards)
.numberOfReplicas(1);
builder.putCustom(VirtualShardRoutingHelper.VIRTUAL_SHARDS_CUSTOM_METADATA_KEY, overrides);

IndexMetadata metadata = builder.build();

assertEquals(1, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 7));
assertEquals(2, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 8));
assertEquals(4, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 19));
}

public void testResolvePhysicalShardIdInvalidConfigurations() {
int numPhysicalShards = 5;

IndexMetadata metadataDisabled = org.mockito.Mockito.mock(IndexMetadata.class);
org.mockito.Mockito.when(metadataDisabled.getNumberOfVirtualShards()).thenReturn(-1);
org.mockito.Mockito.when(metadataDisabled.getNumberOfShards()).thenReturn(numPhysicalShards);

IllegalArgumentException e1 = expectThrows(
IllegalArgumentException.class,
() -> VirtualShardRoutingHelper.resolvePhysicalShardId(metadataDisabled, 0)
);
assertTrue(e1.getMessage().contains("must be enabled and be a multiple"));

IndexMetadata metadataInvalid = org.mockito.Mockito.mock(IndexMetadata.class);
org.mockito.Mockito.when(metadataInvalid.getNumberOfVirtualShards()).thenReturn(13);
org.mockito.Mockito.when(metadataInvalid.getNumberOfShards()).thenReturn(numPhysicalShards);

IllegalArgumentException e2 = expectThrows(
IllegalArgumentException.class,
() -> VirtualShardRoutingHelper.resolvePhysicalShardId(metadataInvalid, 0)
);
assertTrue(e2.getMessage().contains("must be enabled and be a multiple"));
}

public void testResolvePhysicalShardIdOutOfBoundsNormalization() {
int numPhysicalShards = 5;
IndexMetadata metadata = org.mockito.Mockito.mock(IndexMetadata.class);
org.mockito.Mockito.when(metadata.getNumberOfVirtualShards()).thenReturn(20);
org.mockito.Mockito.when(metadata.getNumberOfShards()).thenReturn(numPhysicalShards);

assertEquals(4, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, -1));

assertEquals(0, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 20));
assertEquals(1, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 25));
}
}
Loading
Loading