Skip to content

Commit 5f0e98c

Browse files
committed
Virtual shard extract-merge groundwork
Add the Phase 2 storage primitive for virtual shards by extracting documents for a target vShard into a standalone Lucene index. - add VirtualShardFilteredMergePolicy (filter-on-write extraction) - add IndexShard#extractVirtualShard(int, Path) with validation - add VirtualShardRoutingHelper#computeVirtualShardId and use it from OperationRouting to keep routing/extraction parity Also address post-PR review findings for phase 1 PR. Signed-off-by: Atri Sharma <atri.jiit@gmail.com>
1 parent a756e88 commit 5f0e98c

File tree

8 files changed

+777
-10
lines changed

8 files changed

+777
-10
lines changed

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1069,6 +1069,7 @@ public Iterator<Setting<?>> settings() {
10691069
private final int routingFactor;
10701070
private final int routingPartitionSize;
10711071

1072+
private final int numberOfVirtualShards;
10721073
private final int numberOfShards;
10731074
private final int numberOfReplicas;
10741075
private final int numberOfSearchOnlyReplicas;
@@ -1168,6 +1169,7 @@ private IndexMetadata(
11681169
this.primaryTerms = primaryTerms;
11691170
assert primaryTerms.length == numberOfShards;
11701171
this.state = state;
1172+
this.numberOfVirtualShards = INDEX_NUMBER_OF_VIRTUAL_SHARDS_SETTING.get(settings);
11711173
this.numberOfShards = numberOfShards;
11721174
this.numberOfReplicas = numberOfReplicas;
11731175
this.numberOfSearchOnlyReplicas = numberOfSearchOnlyReplicas;
@@ -1348,7 +1350,7 @@ public int getNumberOfShards() {
13481350
* @return the number of virtual shards or -1
13491351
*/
13501352
public int getNumberOfVirtualShards() {
1351-
return settings.getAsInt(SETTING_NUMBER_OF_VIRTUAL_SHARDS, -1);
1353+
return numberOfVirtualShards;
13521354
}
13531355

13541356
public int getNumberOfReplicas() {

server/src/main/java/org/opensearch/cluster/metadata/VirtualShardRoutingHelper.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,13 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13-
import org.opensearch.common.annotation.PublicApi;
13+
import org.opensearch.cluster.routing.Murmur3HashFunction;
1414

1515
import java.util.Map;
1616

1717
/**
1818
* Resolves virtual shard routing to physical shard IDs.
1919
*/
20-
@PublicApi(since = "3.6.0")
2120
public final class VirtualShardRoutingHelper {
2221

2322
private VirtualShardRoutingHelper() {}
@@ -29,6 +28,26 @@ private VirtualShardRoutingHelper() {}
2928
*/
3029
public static final String VIRTUAL_SHARDS_CUSTOM_METADATA_KEY = "virtual_shards_routing";
3130

31+
/**
32+
* Computes the virtual shard id for a document given its id and optional routing.
33+
* Replicates the hash chain used by {@code OperationRouting.generateShardId},
34+
* including partition offset for routing-partitioned indices.
35+
*
36+
* @throws IllegalStateException if virtual shards are not enabled on the index
37+
*/
38+
public static int computeVirtualShardId(IndexMetadata indexMetadata, String id, String routing) {
39+
int numVirtualShards = indexMetadata.getNumberOfVirtualShards();
40+
if (numVirtualShards <= 0) {
41+
throw new IllegalStateException("virtual shards are not enabled on index [" + indexMetadata.getIndex() + "]");
42+
}
43+
String effectiveRouting = (routing != null) ? routing : id;
44+
int partitionOffset = indexMetadata.isRoutingPartitionedIndex()
45+
? Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize())
46+
: 0;
47+
int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
48+
return Math.floorMod(hash, numVirtualShards);
49+
}
50+
3251
/**
3352
* Resolves the physical shard for a virtual shard id.
3453
*/

server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,12 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null
505505
}
506506

507507
public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) {
508+
int numVirtualShards = indexMetadata.getNumberOfVirtualShards();
509+
if (numVirtualShards != -1) {
510+
int vShardId = VirtualShardRoutingHelper.computeVirtualShardId(indexMetadata, id, routing);
511+
return VirtualShardRoutingHelper.resolvePhysicalShardId(indexMetadata, vShardId);
512+
}
513+
508514
final String effectiveRouting;
509515
final int partitionOffset;
510516

@@ -522,13 +528,6 @@ public static int generateShardId(IndexMetadata indexMetadata, @Nullable String
522528
partitionOffset = 0;
523529
}
524530

525-
int numVirtualShards = indexMetadata.getNumberOfVirtualShards();
526-
if (numVirtualShards != -1) {
527-
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
528-
int vShardId = Math.floorMod(hash, numVirtualShards);
529-
return VirtualShardRoutingHelper.resolvePhysicalShardId(indexMetadata, vShardId);
530-
}
531-
532531
return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);
533532
}
534533

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.merge;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.lucene.index.CodecReader;
14+
import org.apache.lucene.index.FieldInfo;
15+
import org.apache.lucene.index.FilterCodecReader;
16+
import org.apache.lucene.index.FilterLeafReader;
17+
import org.apache.lucene.index.IndexWriter;
18+
import org.apache.lucene.index.IndexWriterConfig;
19+
import org.apache.lucene.index.LeafReader;
20+
import org.apache.lucene.index.LeafReaderContext;
21+
import org.apache.lucene.index.SegmentReader;
22+
import org.apache.lucene.index.StoredFieldVisitor;
23+
import org.apache.lucene.index.StoredFields;
24+
import org.apache.lucene.store.Directory;
25+
import org.apache.lucene.store.FSDirectory;
26+
import org.apache.lucene.util.Bits;
27+
import org.opensearch.cluster.metadata.IndexMetadata;
28+
import org.opensearch.cluster.metadata.VirtualShardRoutingHelper;
29+
import org.opensearch.index.mapper.IdFieldMapper;
30+
import org.opensearch.index.mapper.RoutingFieldMapper;
31+
import org.opensearch.index.mapper.Uid;
32+
import org.opensearch.index.shard.IndexShard;
33+
34+
import java.io.IOException;
35+
import java.nio.file.Path;
36+
import java.util.ArrayList;
37+
import java.util.List;
38+
39+
/**
40+
* Utility for extracting documents for a target virtual shard.
41+
*
42+
* @opensearch.internal
43+
*/
44+
public final class VirtualShardFilteredMergePolicy {
45+
46+
private static final Logger logger = LogManager.getLogger(VirtualShardFilteredMergePolicy.class);
47+
private static final int MAX_UNWRAP_DEPTH = 100;
48+
49+
private VirtualShardFilteredMergePolicy() {}
50+
51+
/**
52+
* Extracts documents for a target virtual shard.
53+
* Note: The index at {@code outputPath} will be recreated/overwritten.
54+
*/
55+
public static void isolateVirtualShard(IndexShard shard, IndexMetadata indexMetadata, int vShardId, Path outputPath)
56+
throws IOException {
57+
58+
try (var searcher = shard.acquireSearcher("vshard_extract")) {
59+
List<CodecReader> wrappedReaders = new ArrayList<>();
60+
org.apache.lucene.codecs.Codec codec = null;
61+
62+
for (LeafReaderContext ctx : searcher.getIndexReader().leaves()) {
63+
LeafReader leafReader = ctx.reader();
64+
CodecReader codecReader = unwrapToCodecReader(leafReader);
65+
if (codecReader == null) {
66+
logger.trace("skipping leaf [{}]: unable to unwrap to CodecReader", leafReader.getClass());
67+
continue;
68+
}
69+
70+
if (codec == null && codecReader instanceof SegmentReader) {
71+
codec = ((SegmentReader) codecReader).getSegmentInfo().info.getCodec();
72+
}
73+
74+
Bits sourceLiveDocs = leafReader.getLiveDocs();
75+
wrappedReaders.add(new VirtualShardFilterReader(codecReader, sourceLiveDocs, indexMetadata, vShardId));
76+
}
77+
78+
IndexWriterConfig iwc = new IndexWriterConfig(null);
79+
iwc.setOpenMode(IndexWriterConfig.OpenMode.CREATE);
80+
if (codec != null) {
81+
iwc.setCodec(codec);
82+
}
83+
84+
try (Directory outDir = FSDirectory.open(outputPath); IndexWriter writer = new IndexWriter(outDir, iwc)) {
85+
if (!wrappedReaders.isEmpty()) {
86+
writer.addIndexes(wrappedReaders.toArray(new CodecReader[0]));
87+
writer.forceMerge(1);
88+
}
89+
writer.commit();
90+
}
91+
}
92+
}
93+
94+
private static CodecReader unwrapToCodecReader(LeafReader reader) {
95+
LeafReader current = reader;
96+
for (int i = 0; i < MAX_UNWRAP_DEPTH; i++) {
97+
if (current instanceof CodecReader) {
98+
return (CodecReader) current;
99+
}
100+
if (current instanceof FilterLeafReader) {
101+
current = ((FilterLeafReader) current).getDelegate();
102+
} else {
103+
break;
104+
}
105+
}
106+
if (current instanceof CodecReader) {
107+
return (CodecReader) current;
108+
}
109+
logger.error("failed to unwrap to CodecReader after {} iterations, terminal type: {}", MAX_UNWRAP_DEPTH, current.getClass());
110+
return null;
111+
}
112+
113+
static final class VirtualShardFilterReader extends FilterCodecReader {
114+
private final Bits liveDocs;
115+
private final int numDocs;
116+
117+
VirtualShardFilterReader(CodecReader in, Bits sourceLiveDocs, IndexMetadata indexMetadata, int targetVShardId) throws IOException {
118+
super(in);
119+
120+
int maxDoc = in.maxDoc();
121+
int count = 0;
122+
boolean[] membership = new boolean[maxDoc];
123+
124+
StoredFields storedFields = in.storedFields();
125+
for (int docId = 0; docId < maxDoc; docId++) {
126+
if (sourceLiveDocs != null && !sourceLiveDocs.get(docId)) {
127+
continue;
128+
}
129+
VShardFieldVisitor visitor = new VShardFieldVisitor();
130+
storedFields.document(docId, visitor);
131+
if (visitor.id == null) {
132+
continue;
133+
}
134+
int computedVShard = VirtualShardRoutingHelper.computeVirtualShardId(indexMetadata, visitor.id, visitor.routing);
135+
if (computedVShard == targetVShardId) {
136+
membership[docId] = true;
137+
count++;
138+
}
139+
}
140+
141+
final boolean[] membershipFinal = membership;
142+
final Bits srcLive = sourceLiveDocs;
143+
this.numDocs = count;
144+
this.liveDocs = new Bits() {
145+
@Override
146+
public boolean get(int index) {
147+
if (srcLive != null && !srcLive.get(index)) {
148+
return false;
149+
}
150+
return membershipFinal[index];
151+
}
152+
153+
@Override
154+
public int length() {
155+
return maxDoc;
156+
}
157+
};
158+
}
159+
160+
@Override
161+
public Bits getLiveDocs() {
162+
return liveDocs;
163+
}
164+
165+
@Override
166+
public int numDocs() {
167+
return numDocs;
168+
}
169+
170+
@Override
171+
public CacheHelper getCoreCacheHelper() {
172+
return null;
173+
}
174+
175+
@Override
176+
public CacheHelper getReaderCacheHelper() {
177+
return null;
178+
}
179+
}
180+
181+
private static final class VShardFieldVisitor extends StoredFieldVisitor {
182+
String id;
183+
String routing;
184+
private int leftToVisit = 2;
185+
186+
@Override
187+
public void binaryField(FieldInfo fieldInfo, byte[] value) throws IOException {
188+
if (IdFieldMapper.NAME.equals(fieldInfo.name)) {
189+
id = Uid.decodeId(value);
190+
}
191+
}
192+
193+
@Override
194+
public void stringField(FieldInfo fieldInfo, String value) throws IOException {
195+
if (RoutingFieldMapper.NAME.equals(fieldInfo.name)) {
196+
routing = value;
197+
} else if (IdFieldMapper.NAME.equals(fieldInfo.name)) {
198+
id = value;
199+
}
200+
}
201+
202+
@Override
203+
public Status needsField(FieldInfo fieldInfo) {
204+
switch (fieldInfo.name) {
205+
case IdFieldMapper.NAME:
206+
case RoutingFieldMapper.NAME:
207+
leftToVisit--;
208+
return Status.YES;
209+
default:
210+
return leftToVisit == 0 ? Status.STOP : Status.NO;
211+
}
212+
}
213+
}
214+
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@
160160
import org.opensearch.index.mapper.Uid;
161161
import org.opensearch.index.merge.MergeStats;
162162
import org.opensearch.index.merge.MergedSegmentTransferTracker;
163+
import org.opensearch.index.merge.VirtualShardFilteredMergePolicy;
163164
import org.opensearch.index.recovery.RecoveryStats;
164165
import org.opensearch.index.refresh.RefreshStats;
165166
import org.opensearch.index.remote.RemoteSegmentStats;
@@ -1693,6 +1694,24 @@ public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
16931694
);
16941695
}
16951696

1697+
/**
1698+
* Extracts documents belonging to the given virtual shard into a standalone index at {@code outputPath}.
1699+
*/
1700+
public void extractVirtualShard(int vShardId, java.nio.file.Path outputPath) throws IOException {
1701+
verifyNotClosed();
1702+
IndexMetadata indexMetadata = indexSettings.getIndexMetadata();
1703+
int numVirtualShards = indexMetadata.getNumberOfVirtualShards();
1704+
if (numVirtualShards <= 0) {
1705+
throw new IllegalStateException("virtual shards are not enabled on index [" + shardId.getIndex() + "]");
1706+
}
1707+
if (vShardId < 0 || vShardId >= numVirtualShards) {
1708+
throw new IllegalArgumentException(
1709+
"vShardId [" + vShardId + "] out of range [0, " + numVirtualShards + ") for index [" + shardId.getIndex() + "]"
1710+
);
1711+
}
1712+
VirtualShardFilteredMergePolicy.isolateVirtualShard(this, indexMetadata, vShardId, outputPath);
1713+
}
1714+
16961715
/**
16971716
* Upgrades the shard to the current version of Lucene and returns the minimum segment version
16981717
*/

server/src/test/java/org/opensearch/cluster/metadata/VirtualShardRoutingHelperTests.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,51 @@ public void testResolvePhysicalShardIdOutOfBoundsNormalization() {
120120
assertEquals(0, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 20));
121121
assertEquals(1, VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, 25));
122122
}
123+
124+
public void testComputeVirtualShardId() {
125+
IndexMetadata metadata = IndexMetadata.builder("test")
126+
.settings(
127+
Settings.builder()
128+
.put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT)
129+
.put(IndexMetadata.SETTING_NUMBER_OF_VIRTUAL_SHARDS, 20)
130+
)
131+
.numberOfShards(5)
132+
.numberOfReplicas(1)
133+
.build();
134+
135+
int vShardId = VirtualShardRoutingHelper.computeVirtualShardId(metadata, "test_doc", null);
136+
assertTrue(vShardId >= 0 && vShardId < 20);
137+
138+
assertEquals(vShardId, VirtualShardRoutingHelper.computeVirtualShardId(metadata, "test_doc", null));
139+
}
140+
141+
public void testComputeVirtualShardIdDisabledThrows() {
142+
IndexMetadata metadata = IndexMetadata.builder("test")
143+
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT))
144+
.numberOfShards(5)
145+
.numberOfReplicas(1)
146+
.build();
147+
148+
expectThrows(IllegalStateException.class, () -> VirtualShardRoutingHelper.computeVirtualShardId(metadata, "doc1", null));
149+
}
150+
151+
public void testComputeVirtualShardIdRoutingParity() {
152+
IndexMetadata metadata = IndexMetadata.builder("test")
153+
.settings(
154+
Settings.builder()
155+
.put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT)
156+
.put(IndexMetadata.SETTING_NUMBER_OF_VIRTUAL_SHARDS, 20)
157+
)
158+
.numberOfShards(5)
159+
.numberOfReplicas(1)
160+
.build();
161+
162+
for (int i = 0; i < 100; i++) {
163+
String id = "doc_" + i;
164+
int vShardId = VirtualShardRoutingHelper.computeVirtualShardId(metadata, id, null);
165+
int physicalFromHelper = VirtualShardRoutingHelper.resolvePhysicalShardId(metadata, vShardId);
166+
int physicalFromRouting = org.opensearch.cluster.routing.OperationRouting.generateShardId(metadata, id, null);
167+
assertEquals("Routing parity broken for id=" + id, physicalFromRouting, physicalFromHelper);
168+
}
169+
}
123170
}

0 commit comments

Comments
 (0)