Skip to content

Commit 2a6e582

Browse files
committed
Introduce IndexReshardingMetadata
This adds to IndexMetadata the persistent state we will need to track while a split is in progress. Nothing outside test code sets it yet, so it doesn't introduce any wire changes yet. Followups will consult this to make routing decisions and handle backward compatibility if the object is present in metadata.
1 parent a1b2f3d commit 2a6e582

File tree

4 files changed

+291
-7
lines changed

4 files changed

+291
-7
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,8 @@ public Iterator<Setting<?>> settings() {
561561

562562
public static final String KEY_INFERENCE_FIELDS = "field_inference";
563563

564+
public static final String KEY_RESHARDING = "resharding";
565+
564566
public static final String INDEX_STATE_FILE_PREFIX = "state-";
565567

566568
static final TransportVersion STATS_AND_FORECAST_ADDED = TransportVersions.V_8_6_0;
@@ -654,6 +656,8 @@ public Iterator<Setting<?>> settings() {
654656
private final Double writeLoadForecast;
655657
@Nullable
656658
private final Long shardSizeInBytesForecast;
659+
@Nullable
660+
private final IndexReshardingMetadata reshardingMetadata;
657661

658662
private IndexMetadata(
659663
final Index index,
@@ -702,7 +706,8 @@ private IndexMetadata(
702706
final IndexVersion indexCompatibilityVersion,
703707
@Nullable final IndexMetadataStats stats,
704708
@Nullable final Double writeLoadForecast,
705-
@Nullable Long shardSizeInBytesForecast
709+
@Nullable Long shardSizeInBytesForecast,
710+
@Nullable IndexReshardingMetadata reshardingMetadata
706711
) {
707712
this.index = index;
708713
this.version = version;
@@ -761,6 +766,7 @@ private IndexMetadata(
761766
this.writeLoadForecast = writeLoadForecast;
762767
this.shardSizeInBytesForecast = shardSizeInBytesForecast;
763768
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
769+
this.reshardingMetadata = reshardingMetadata;
764770
}
765771

766772
IndexMetadata withMappingMetadata(MappingMetadata mapping) {
@@ -814,7 +820,8 @@ IndexMetadata withMappingMetadata(MappingMetadata mapping) {
814820
this.indexCompatibilityVersion,
815821
this.stats,
816822
this.writeLoadForecast,
817-
this.shardSizeInBytesForecast
823+
this.shardSizeInBytesForecast,
824+
this.reshardingMetadata
818825
);
819826
}
820827

@@ -875,7 +882,8 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set<String> inSyncSet)
875882
this.indexCompatibilityVersion,
876883
this.stats,
877884
this.writeLoadForecast,
878-
this.shardSizeInBytesForecast
885+
this.shardSizeInBytesForecast,
886+
this.reshardingMetadata
879887
);
880888
}
881889

@@ -934,7 +942,8 @@ public IndexMetadata withIncrementedPrimaryTerm(int shardId) {
934942
this.indexCompatibilityVersion,
935943
this.stats,
936944
this.writeLoadForecast,
937-
this.shardSizeInBytesForecast
945+
this.shardSizeInBytesForecast,
946+
this.reshardingMetadata
938947
);
939948
}
940949

@@ -994,7 +1003,8 @@ public IndexMetadata withTimestampRanges(IndexLongFieldRange timestampRange, Ind
9941003
this.indexCompatibilityVersion,
9951004
this.stats,
9961005
this.writeLoadForecast,
997-
this.shardSizeInBytesForecast
1006+
this.shardSizeInBytesForecast,
1007+
this.reshardingMetadata
9981008
);
9991009
}
10001010

@@ -1049,7 +1059,8 @@ public IndexMetadata withIncrementedVersion() {
10491059
this.indexCompatibilityVersion,
10501060
this.stats,
10511061
this.writeLoadForecast,
1052-
this.shardSizeInBytesForecast
1062+
this.shardSizeInBytesForecast,
1063+
this.reshardingMetadata
10531064
);
10541065
}
10551066

@@ -1905,6 +1916,7 @@ public static class Builder {
19051916
private IndexMetadataStats stats = null;
19061917
private Double indexWriteLoadForecast = null;
19071918
private Long shardSizeInBytesForecast = null;
1919+
private IndexReshardingMetadata reshardingMetadata = null;
19081920

19091921
public Builder(String index) {
19101922
this.index = index;
@@ -1940,6 +1952,7 @@ public Builder(IndexMetadata indexMetadata) {
19401952
this.stats = indexMetadata.stats;
19411953
this.indexWriteLoadForecast = indexMetadata.writeLoadForecast;
19421954
this.shardSizeInBytesForecast = indexMetadata.shardSizeInBytesForecast;
1955+
this.reshardingMetadata = indexMetadata.reshardingMetadata;
19431956
}
19441957

19451958
public Builder index(String index) {
@@ -2190,6 +2203,11 @@ public Builder putInferenceFields(Map<String, InferenceFieldMetadata> values) {
21902203
return this;
21912204
}
21922205

2206+
public Builder reshardingMetadata(IndexReshardingMetadata reshardingMetadata) {
2207+
this.reshardingMetadata = reshardingMetadata;
2208+
return this;
2209+
}
2210+
21932211
public IndexMetadata build() {
21942212
return build(false);
21952213
}
@@ -2389,7 +2407,8 @@ IndexMetadata build(boolean repair) {
23892407
SETTING_INDEX_VERSION_COMPATIBILITY.get(settings),
23902408
stats,
23912409
indexWriteLoadForecast,
2392-
shardSizeInBytesForecast
2410+
shardSizeInBytesForecast,
2411+
reshardingMetadata
23932412
);
23942413
}
23952414

@@ -2529,6 +2548,12 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build
25292548
builder.endObject();
25302549
}
25312550

2551+
if (indexMetadata.reshardingMetadata != null) {
2552+
builder.startObject(KEY_RESHARDING);
2553+
indexMetadata.reshardingMetadata.toXContent(builder, params);
2554+
builder.endObject();
2555+
}
2556+
25322557
builder.endObject();
25332558
}
25342559

@@ -2615,6 +2640,9 @@ public static IndexMetadata fromXContent(XContentParser parser, Map<String, Mapp
26152640
builder.putInferenceField(InferenceFieldMetadata.fromXContent(parser));
26162641
}
26172642
break;
2643+
case KEY_RESHARDING:
2644+
builder.reshardingMetadata(IndexReshardingMetadata.fromXContent(parser));
2645+
break;
26182646
default:
26192647
// assume it's custom index metadata
26202648
builder.putCustom(currentFieldName, parser.mapStrings());
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.metadata;
11+
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.common.io.stream.Writeable;
15+
import org.elasticsearch.xcontent.ConstructingObjectParser;
16+
import org.elasticsearch.xcontent.ParseField;
17+
import org.elasticsearch.xcontent.ToXContentFragment;
18+
import org.elasticsearch.xcontent.XContentBuilder;
19+
import org.elasticsearch.xcontent.XContentParser;
20+
21+
import java.io.IOException;
22+
import java.util.Arrays;
23+
import java.util.List;
24+
import java.util.Objects;
25+
26+
/**
27+
* IndexReshardingMetadata holds persistent state managing an in-flight index resharding operation
28+
*
29+
* Resharding is changing the number of shards that comprise an index.
30+
* We currently only support splitting an index into an integer multiple of its current shard count,
31+
* e.g., going from 1 to 3 shards, or 2 to 4. This is due to the fact that we route documents to
32+
* shards by hash of the document id modulo the shard count. Multiplying the shard count under this
33+
* scheme lets us move only the fraction of the documents that route to new shards while the rest stay
34+
* where they were.
35+
*
36+
* During a split, we create new shards and then migrate the documents that belong to the new shards
37+
* according to the routing function to those new shards. While we're moving documents, search requests
38+
* may be ongoing, or new documents may be indexed. There must not be ambiguity about whether the source
39+
* shard or the target shards are responsible for documents being indexed or searched while this handoff
40+
* is occurring, to ensure that we don't lose or double-count documents during the process. We manage this
41+
* by maintaining the state of the split on the source and target shards, and making an atomic (from the point
42+
* of view of indexing and search requests) transition from handling requests that route to the target shard
43+
* on the source shard, to letting the new target shard handle them. This transition state is called SPLIT_HANDOFF:
44+
* before the handoff, the source shard has the entire document collection for both the source and target, and handles
45+
* indexing and search requests. After the handoff, documents that route to the target are handled by the target,
46+
* and the source does not necessarily have a complete view - it will be missing any documents that are indexed
47+
* to the target shard after SPLIT_HANDOFF. In fact, when the target becomes active, the source filters target documents
48+
* from its search results, so that they are not counted twice when the target shard is also searched. The handoff
49+
* is performed at the target by queueing incoming requests prior to entering HANDOFF, waiting for the target to
50+
* be RUNNING, and then forwarding requests for the target shard to the target. Similarly, when the target first
51+
* becomes active it must filter out search results containing documents owned by the source shard, which may be
52+
* present if the target was created by copying the source shard's Lucene files.
53+
*
54+
* To ensure that we always route requests to the correct shard, even in the case of failure of either the source or
55+
* target during split, we preserve the transition point in persistent state until the split is complete, so that
56+
* when the source or target recovers, it can resync and route correctly based on that state. This class holds the persistent
57+
* state required to recover correctly, always maintaining the invariant that only the source shard accepts indexing and search
58+
* requests for the target prior to SPLIT_HANDOFF, and only the target shard accepts them afterward.
59+
*
60+
* The state we preserve is:
61+
* * The old and new shard counts for a resize operation, so that the source and target shards
62+
* know whether they are source or target, and which is their peer. For example, old:1 new:2 lets
63+
* shard 1 figure out that it is the source shard for shard 2, and lets shard 2 figure out that it
64+
* is the target for shard 1.
65+
* * For each target shard, its current state, one of SPLITTING, HANDOFF, CLEANUP, or DONE. If the target is in
66+
* SPLITTING, then if either the source or target recovers, split is restarted from the top (though it may reuse any
67+
* files already transferred), and the source is responsible for handling indexing and search requests to the target shard.
68+
* If the target is in HANDOFF, then on recovery the source is not responsible for handling indexing and search requests
69+
* for the target, and must instead filter out target requests from local search.
70+
* If the target is in CLEANUP, then the source and target can both delete the other's documents from their local shard.
71+
* When the target has finished removing all the source's documents, it transitions to DONE and removes source filters.
72+
* As the source completes deletes for each target it can remove filters for that shard as well.
73+
* When all targets enter the DONE state and the source has completed removing the documents for every target shard, then
74+
* resharding is complete and the IndexReshardingMetadata can be removed from persistent state.
75+
*/
76+
public record IndexReshardingMetadata(int oldShardCount, int newShardCount, ShardReshardingState[] targetShardStates)
77+
implements
78+
ToXContentFragment,
79+
Writeable {
80+
public enum ShardReshardingState implements Writeable {
81+
SPLITTING,
82+
HANDOFF,
83+
CLEANUP,
84+
DONE;
85+
86+
@Override
87+
public void writeTo(StreamOutput out) throws IOException {
88+
out.writeEnum(this);
89+
}
90+
}
91+
92+
// Copying from IndexMetadataStats here
93+
public static final ParseField OLD_SHARD_COUNT_FIELD = new ParseField("old_shard_count");
94+
public static final ParseField NEW_SHARD_COUNT_FIELD = new ParseField("new_shard_count");
95+
public static final ParseField TARGET_SHARD_STATES_FIELD = new ParseField("target_shard_states");
96+
97+
@SuppressWarnings("unchecked")
98+
private static final ConstructingObjectParser<IndexReshardingMetadata, Void> PARSER = new ConstructingObjectParser<>(
99+
"index_resharding_metadata_parser",
100+
false,
101+
(args, unused) -> new IndexReshardingMetadata(
102+
(int) args[0],
103+
(int) args[1],
104+
((List<ShardReshardingState>) args[2]).toArray(new ShardReshardingState[0])
105+
)
106+
);
107+
108+
static {
109+
PARSER.declareInt(ConstructingObjectParser.constructorArg(), OLD_SHARD_COUNT_FIELD);
110+
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NEW_SHARD_COUNT_FIELD);
111+
// XXX I'm not sure this is the best way to parse an array of enums
112+
PARSER.declareObjectArray(
113+
ConstructingObjectParser.constructorArg(),
114+
(parser, c) -> ShardReshardingState.valueOf(parser.text()),
115+
TARGET_SHARD_STATES_FIELD
116+
);
117+
}
118+
119+
static IndexReshardingMetadata fromXContent(XContentParser parser) throws IOException {
120+
return PARSER.parse(parser, null);
121+
}
122+
123+
@Override
124+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
125+
builder.field(OLD_SHARD_COUNT_FIELD.getPreferredName(), oldShardCount);
126+
builder.field(NEW_SHARD_COUNT_FIELD.getPreferredName(), newShardCount);
127+
builder.field(TARGET_SHARD_STATES_FIELD.getPreferredName(), targetShardStates);
128+
return builder;
129+
}
130+
131+
@Override
132+
public void writeTo(StreamOutput out) throws IOException {
133+
out.writeInt(oldShardCount);
134+
out.writeInt(newShardCount);
135+
out.writeArray(targetShardStates);
136+
}
137+
138+
public IndexReshardingMetadata(StreamInput in) throws IOException {
139+
this(in.readInt(), in.readInt(), in.readArray(i -> i.readEnum(ShardReshardingState.class), ShardReshardingState[]::new));
140+
}
141+
142+
public IndexReshardingMetadata(int oldShardCount, int newShardCount) {
143+
this(oldShardCount, newShardCount, initialTargetShardStates(newShardCount - oldShardCount));
144+
}
145+
146+
public IndexReshardingMetadata(int oldShardCount, int newShardCount, ShardReshardingState[] targetShardStates) {
147+
assert newShardCount > oldShardCount : "Reshard currently only supports increasing the number of shards";
148+
assert newShardCount / oldShardCount * oldShardCount == newShardCount : "New shard count must be multiple of old shard count";
149+
assert targetShardStates.length == newShardCount - oldShardCount : "Must be one target shard state for each new shard";
150+
151+
this.oldShardCount = oldShardCount;
152+
this.newShardCount = newShardCount;
153+
this.targetShardStates = targetShardStates;
154+
}
155+
156+
// can't use record implementation because we need a deep comparison of targetShardStates
157+
@Override
158+
public boolean equals(Object other) {
159+
if (this == other) {
160+
return true;
161+
}
162+
if (other == null || getClass() != other.getClass()) {
163+
return false;
164+
}
165+
IndexReshardingMetadata otherMetadata = (IndexReshardingMetadata) other;
166+
return oldShardCount == otherMetadata.oldShardCount
167+
&& newShardCount == otherMetadata.newShardCount
168+
&& Arrays.equals(targetShardStates, otherMetadata.targetShardStates);
169+
}
170+
171+
@Override
172+
public int hashCode() {
173+
return Objects.hash(oldShardCount, newShardCount, Arrays.hashCode(targetShardStates));
174+
}
175+
176+
public void setTargetShardState(int shard, ShardReshardingState shardState) {
177+
targetShardStates[shard] = shardState;
178+
}
179+
180+
public ShardReshardingState getTargetShardState(int shard) {
181+
return targetShardStates[shard];
182+
}
183+
184+
private static ShardReshardingState[] initialTargetShardStates(int targetShardCount) {
185+
ShardReshardingState[] targetShardStates = new ShardReshardingState[targetShardCount];
186+
Arrays.fill(targetShardStates, ShardReshardingState.SPLITTING);
187+
return targetShardStates;
188+
}
189+
}

server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public void testIndexMetadataSerialization() throws IOException {
9494
Double indexWriteLoadForecast = randomBoolean() ? randomDoubleBetween(0.0, 128, true) : null;
9595
Long shardSizeInBytesForecast = randomBoolean() ? randomLongBetween(1024, 10240) : null;
9696
Map<String, InferenceFieldMetadata> inferenceFields = randomInferenceFields();
97+
IndexReshardingMetadata reshardingMetadata = randomBoolean() ? randomIndexReshardingMetadata(numShard) : null;
9798

9899
IndexMetadata metadata = IndexMetadata.builder("foo")
99100
.settings(indexSettings(numShard, numberOfReplicas).put("index.version.created", 1))
@@ -129,6 +130,7 @@ public void testIndexMetadataSerialization() throws IOException {
129130
IndexLongFieldRange.NO_SHARDS.extendWithShardRange(0, 1, ShardLongFieldRange.of(5000000, 5500000))
130131
)
131132
)
133+
.reshardingMetadata(reshardingMetadata)
132134
.build();
133135
assertEquals(system, metadata.isSystem());
134136

@@ -706,4 +708,14 @@ private IndexMetadataStats randomIndexStats(int numberOfShards) {
706708
}
707709
return new IndexMetadataStats(indexWriteLoadBuilder.build(), randomLongBetween(100, 1024), randomIntBetween(1, 2));
708710
}
711+
712+
private IndexReshardingMetadata randomIndexReshardingMetadata(int oldShards) {
713+
final int newShards = randomIntBetween(2, 5) * oldShards;
714+
final var targetReshardingStates = new IndexReshardingMetadata.ShardReshardingState[newShards - oldShards];
715+
for (int i = 0; i < targetReshardingStates.length; i++) {
716+
targetReshardingStates[i] = randomFrom(IndexReshardingMetadata.ShardReshardingState.values());
717+
}
718+
719+
return new IndexReshardingMetadata(oldShards, newShards, targetReshardingStates);
720+
}
709721
}

0 commit comments

Comments
 (0)