Skip to content

Commit 3d7cbb2

Browse files
committed
Introduce IndexReshardingMetadata
This adds to IndexMetadata the persistent state we will need to track while a split is in progress. Nothing outside test code sets it yet, so it doesn't introduce any wire changes yet. Followups will consult this to make routing decisions and handle backward compatibility if the object is present in metadata.
1 parent 2c846e7 commit 3d7cbb2

File tree

4 files changed

+358
-7
lines changed

4 files changed

+358
-7
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,8 @@ public Iterator<Setting<?>> settings() {
561561

562562
public static final String KEY_INFERENCE_FIELDS = "field_inference";
563563

564+
public static final String KEY_RESHARDING = "resharding";
565+
564566
public static final String INDEX_STATE_FILE_PREFIX = "state-";
565567

566568
static final TransportVersion STATS_AND_FORECAST_ADDED = TransportVersions.V_8_6_0;
@@ -654,6 +656,8 @@ public Iterator<Setting<?>> settings() {
654656
private final Double writeLoadForecast;
655657
@Nullable
656658
private final Long shardSizeInBytesForecast;
659+
@Nullable
660+
private final IndexReshardingMetadata reshardingMetadata;
657661

658662
private IndexMetadata(
659663
final Index index,
@@ -702,7 +706,8 @@ private IndexMetadata(
702706
final IndexVersion indexCompatibilityVersion,
703707
@Nullable final IndexMetadataStats stats,
704708
@Nullable final Double writeLoadForecast,
705-
@Nullable Long shardSizeInBytesForecast
709+
@Nullable Long shardSizeInBytesForecast,
710+
@Nullable IndexReshardingMetadata reshardingMetadata
706711
) {
707712
this.index = index;
708713
this.version = version;
@@ -761,6 +766,7 @@ private IndexMetadata(
761766
this.writeLoadForecast = writeLoadForecast;
762767
this.shardSizeInBytesForecast = shardSizeInBytesForecast;
763768
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
769+
this.reshardingMetadata = reshardingMetadata;
764770
}
765771

766772
IndexMetadata withMappingMetadata(MappingMetadata mapping) {
@@ -814,7 +820,8 @@ IndexMetadata withMappingMetadata(MappingMetadata mapping) {
814820
this.indexCompatibilityVersion,
815821
this.stats,
816822
this.writeLoadForecast,
817-
this.shardSizeInBytesForecast
823+
this.shardSizeInBytesForecast,
824+
this.reshardingMetadata
818825
);
819826
}
820827

@@ -875,7 +882,8 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set<String> inSyncSet)
875882
this.indexCompatibilityVersion,
876883
this.stats,
877884
this.writeLoadForecast,
878-
this.shardSizeInBytesForecast
885+
this.shardSizeInBytesForecast,
886+
this.reshardingMetadata
879887
);
880888
}
881889

@@ -934,7 +942,8 @@ public IndexMetadata withIncrementedPrimaryTerm(int shardId) {
934942
this.indexCompatibilityVersion,
935943
this.stats,
936944
this.writeLoadForecast,
937-
this.shardSizeInBytesForecast
945+
this.shardSizeInBytesForecast,
946+
this.reshardingMetadata
938947
);
939948
}
940949

@@ -994,7 +1003,8 @@ public IndexMetadata withTimestampRanges(IndexLongFieldRange timestampRange, Ind
9941003
this.indexCompatibilityVersion,
9951004
this.stats,
9961005
this.writeLoadForecast,
997-
this.shardSizeInBytesForecast
1006+
this.shardSizeInBytesForecast,
1007+
this.reshardingMetadata
9981008
);
9991009
}
10001010

@@ -1049,7 +1059,8 @@ public IndexMetadata withIncrementedVersion() {
10491059
this.indexCompatibilityVersion,
10501060
this.stats,
10511061
this.writeLoadForecast,
1052-
this.shardSizeInBytesForecast
1062+
this.shardSizeInBytesForecast,
1063+
this.reshardingMetadata
10531064
);
10541065
}
10551066

@@ -1905,6 +1916,7 @@ public static class Builder {
19051916
private IndexMetadataStats stats = null;
19061917
private Double indexWriteLoadForecast = null;
19071918
private Long shardSizeInBytesForecast = null;
1919+
private IndexReshardingMetadata reshardingMetadata = null;
19081920

19091921
public Builder(String index) {
19101922
this.index = index;
@@ -1940,6 +1952,7 @@ public Builder(IndexMetadata indexMetadata) {
19401952
this.stats = indexMetadata.stats;
19411953
this.indexWriteLoadForecast = indexMetadata.writeLoadForecast;
19421954
this.shardSizeInBytesForecast = indexMetadata.shardSizeInBytesForecast;
1955+
this.reshardingMetadata = indexMetadata.reshardingMetadata;
19431956
}
19441957

19451958
public Builder index(String index) {
@@ -2190,6 +2203,11 @@ public Builder putInferenceFields(Map<String, InferenceFieldMetadata> values) {
21902203
return this;
21912204
}
21922205

2206+
public Builder reshardingMetadata(IndexReshardingMetadata reshardingMetadata) {
2207+
this.reshardingMetadata = reshardingMetadata;
2208+
return this;
2209+
}
2210+
21932211
public IndexMetadata build() {
21942212
return build(false);
21952213
}
@@ -2389,7 +2407,8 @@ IndexMetadata build(boolean repair) {
23892407
SETTING_INDEX_VERSION_COMPATIBILITY.get(settings),
23902408
stats,
23912409
indexWriteLoadForecast,
2392-
shardSizeInBytesForecast
2410+
shardSizeInBytesForecast,
2411+
reshardingMetadata
23932412
);
23942413
}
23952414

@@ -2529,6 +2548,12 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build
25292548
builder.endObject();
25302549
}
25312550

2551+
if (indexMetadata.reshardingMetadata != null) {
2552+
builder.startObject(KEY_RESHARDING);
2553+
indexMetadata.reshardingMetadata.toXContent(builder, params);
2554+
builder.endObject();
2555+
}
2556+
25322557
builder.endObject();
25332558
}
25342559

@@ -2615,6 +2640,9 @@ public static IndexMetadata fromXContent(XContentParser parser, Map<String, Mapp
26152640
builder.putInferenceField(InferenceFieldMetadata.fromXContent(parser));
26162641
}
26172642
break;
2643+
case KEY_RESHARDING:
2644+
builder.reshardingMetadata(IndexReshardingMetadata.fromXContent(parser));
2645+
break;
26182646
default:
26192647
// assume it's custom index metadata
26202648
builder.putCustom(currentFieldName, parser.mapStrings());
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.metadata;
11+
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.common.io.stream.Writeable;
15+
import org.elasticsearch.xcontent.ConstructingObjectParser;
16+
import org.elasticsearch.xcontent.ParseField;
17+
import org.elasticsearch.xcontent.ToXContentFragment;
18+
import org.elasticsearch.xcontent.XContentBuilder;
19+
import org.elasticsearch.xcontent.XContentParser;
20+
21+
import java.io.IOException;
22+
import java.util.Arrays;
23+
import java.util.List;
24+
import java.util.Objects;
25+
26+
/**
27+
* IndexReshardingMetadata holds persistent state managing an in-flight index resharding operation
28+
*
29+
* Resharding is changing the number of shards that make up an index, in place.
30+
* We currently only support splitting an index into an integer multiple of its current shard count,
31+
* e.g., going from 1 to 3 shards, or 2 to 4. This is because we route documents to shards by hash of
32+
* the document id modulo the shard count. Multiplying the shard count under this scheme lets us move
33+
* only the fraction of the documents that route to new shards while the rest stay where they were.
34+
*
35+
* During a split, we create new shards and then migrate the documents that belong to the new shards
36+
* according to the routing function to those new shards. While we're moving documents, search requests
37+
* may be ongoing, or new documents may be indexed. There must not be ambiguity about whether the source
38+
* shard or the target shards are responsible for documents being indexed or searched while this handoff
39+
* is occurring, to ensure that we don't lose or double-count documents during the process. We prevent this
40+
* by maintaining the state of the split on the source and target shards, and making an atomic (from the point
41+
* of view of indexing and search requests) transition from handling requests that route to the target shard
42+
* on the source shard, to letting the target shard handle them.
43+
*
44+
* Before the handoff, the source shard has the entire document collection for both the source and target, and handles
45+
* indexing and search requests. After the handoff, documents that route to the target are handled by the target,
46+
* and the source does not necessarily have a complete view - it will be missing any documents that are indexed
47+
* to the target shard after handoff. Indeed, when the target becomes active, the source filters target documents
48+
* from its search results, so that they are not counted twice when the target shard is also searched. The handoff
49+
* is performed at the target by queueing incoming requests prior to entering handoff, waiting for the target to
50+
* be RUNNING, and then forwarding requests for the target shard to the target. Similarly, when the target first
51+
* becomes active it must filter out search results containing documents owned by the source shard, which may be
52+
* present if the target was created by copying the source shard's Lucene files.
53+
*
54+
* To ensure that we always route requests to the correct shard, even in the case of failure of either source or
55+
* target shards during split, we preserve the transition point in persistent state until the split is complete, so
56+
* that when the source or target recovers, it can resync and route correctly based on that state. This class holds
57+
* the persistent state required to recover correctly, always maintaining the invariant that only the source shard
58+
* accepts indexing and search requests for the target prior to handoff, and only the target shard accepts them afterward.
59+
*
60+
* The state we preserve is:
61+
* * The old and new shard counts for a resize operation, so that we can always identify which shards are sources
62+
* and which are targets during resharding. For example, old:2 new:6 implies that shard 1 is the source shard for
63+
* shards 3 and 5, and shard 2 is the source for shards 4 and 6.
64+
* * For each source shard, its current source state, either `SOURCE` or `DONE`.
65+
* - If a source shard may still contain data for any target shard then it is in state `SOURCE`.
66+
* - When all targets for a source have moved to `SPLIT` (see below), then the source deletes all documents from
67+
* its store that are now the responsibility of the target shards and transitions to `DONE`.
68+
* This isn't strictly required to be persistent for correctness, but it can save time on recovery
69+
* by allowing a DONE shard to skip interrogating targets and repeating cleanup.
70+
* * For each target shard, its current target state, one of `CLONE`, `HANDOFF`, `SPLIT`, or `DONE`.
71+
* - If the target has not yet copied all data from the source shard, then it is in `CLONE`.
72+
* - It moves to `HANDOFF` when it has copied all of its data from the source to indicate that it is now ready to
73+
* receive indexing actions, and starts RUNNING. After this point, the source may no longer contain the entire contents
74+
* of the target and must not index documents belonging to the target. But since search shards can't start up until
75+
* their corresponding index nodes are RUNNING, search requests would fail if they routed to the target shard immediately
76+
* after handoff. So at HANDOFF, the source shards continue to service searches, but block refresh since they cannot
77+
* be guaranteed to have seen documents indexed after HANDOFF.
78+
* - When the target shard's corresponding search replica has started running, the target requests that the source filter
79+
* search results belonging to the target, and moves the target shard's state moves to `SPLIT`. The target's search replica
80+
* likewise filters documents not belonging to the target, which may be present due to the target bootstrapping by copying
81+
* the source's lucene files.
82+
* - Upon entering `SPLIT`, the target starts deleting all documents from its lucene store that do not belong to it. When that
83+
* is complete, it moves to `DONE` and removes filters for other shards, which are no longer necessary.
84+
*
85+
* Note that each target shard's split operates independently and all may happen concurrently.
86+
*
87+
* When all source shards have transitioned to `DONE`, the resize is complete and this metadata may be removed from cluster state.
88+
* We only allow at most a single resharding operation to be in flight for an index, so removing this metadata is a prerequisite
89+
* to beginning another resharding operation.
90+
*/
91+
public record IndexReshardingMetadata(
92+
int oldShardCount,
93+
int newShardCount,
94+
SourceShardState[] sourceShardStates,
95+
TargetShardState[] targetShardStates
96+
) implements ToXContentFragment, Writeable {
97+
public enum SourceShardState implements Writeable {
98+
SOURCE,
99+
DONE;
100+
101+
@Override
102+
public void writeTo(StreamOutput out) throws IOException {
103+
out.writeEnum(this);
104+
}
105+
}
106+
107+
public enum TargetShardState implements Writeable {
108+
CLONE,
109+
HANDOFF,
110+
SPLIT,
111+
DONE;
112+
113+
@Override
114+
public void writeTo(StreamOutput out) throws IOException {
115+
out.writeEnum(this);
116+
}
117+
}
118+
119+
// Copying from IndexMetadataStats here
120+
public static final ParseField OLD_SHARD_COUNT_FIELD = new ParseField("old_shard_count");
121+
public static final ParseField NEW_SHARD_COUNT_FIELD = new ParseField("new_shard_count");
122+
public static final ParseField SOURCE_SHARD_STATES_FIELD = new ParseField("source_shard_states");
123+
public static final ParseField TARGET_SHARD_STATES_FIELD = new ParseField("target_shard_states");
124+
125+
@SuppressWarnings("unchecked")
126+
private static final ConstructingObjectParser<IndexReshardingMetadata, Void> PARSER = new ConstructingObjectParser<>(
127+
"index_resharding_metadata_parser",
128+
false,
129+
(args, unused) -> new IndexReshardingMetadata(
130+
(int) args[0],
131+
(int) args[1],
132+
((List<SourceShardState>) args[2]).toArray(new SourceShardState[0]),
133+
((List<TargetShardState>) args[3]).toArray(new TargetShardState[0])
134+
)
135+
);
136+
137+
static {
138+
PARSER.declareInt(ConstructingObjectParser.constructorArg(), OLD_SHARD_COUNT_FIELD);
139+
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NEW_SHARD_COUNT_FIELD);
140+
// XXX I'm not sure this is the best way to parse an array of enums
141+
PARSER.declareObjectArray(
142+
ConstructingObjectParser.constructorArg(),
143+
(parser, c) -> SourceShardState.valueOf(parser.text()),
144+
SOURCE_SHARD_STATES_FIELD
145+
);
146+
PARSER.declareObjectArray(
147+
ConstructingObjectParser.constructorArg(),
148+
(parser, c) -> TargetShardState.valueOf(parser.text()),
149+
TARGET_SHARD_STATES_FIELD
150+
);
151+
}
152+
153+
static IndexReshardingMetadata fromXContent(XContentParser parser) throws IOException {
154+
return PARSER.parse(parser, null);
155+
}
156+
157+
@Override
158+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
159+
builder.field(OLD_SHARD_COUNT_FIELD.getPreferredName(), oldShardCount);
160+
builder.field(NEW_SHARD_COUNT_FIELD.getPreferredName(), newShardCount);
161+
builder.field(SOURCE_SHARD_STATES_FIELD.getPreferredName(), sourceShardStates);
162+
builder.field(TARGET_SHARD_STATES_FIELD.getPreferredName(), targetShardStates);
163+
return builder;
164+
}
165+
166+
@Override
167+
public void writeTo(StreamOutput out) throws IOException {
168+
out.writeInt(oldShardCount);
169+
out.writeInt(newShardCount);
170+
out.writeArray(sourceShardStates);
171+
out.writeArray(targetShardStates);
172+
}
173+
174+
public IndexReshardingMetadata(StreamInput in) throws IOException {
175+
this(
176+
in.readInt(),
177+
in.readInt(),
178+
in.readArray(i -> i.readEnum(SourceShardState.class), SourceShardState[]::new),
179+
in.readArray(i -> i.readEnum(TargetShardState.class), TargetShardState[]::new)
180+
);
181+
}
182+
183+
public IndexReshardingMetadata(int oldShardCount, int newShardCount) {
184+
this(
185+
oldShardCount,
186+
newShardCount,
187+
initialSourceShardStates(oldShardCount),
188+
initialTargetShardStates(newShardCount - oldShardCount)
189+
);
190+
}
191+
192+
public IndexReshardingMetadata(
193+
int oldShardCount,
194+
int newShardCount,
195+
SourceShardState[] sourceShardStates,
196+
TargetShardState[] targetShardStates
197+
) {
198+
assert newShardCount > oldShardCount : "Reshard currently only supports increasing the number of shards";
199+
assert newShardCount / oldShardCount * oldShardCount == newShardCount : "New shard count must be multiple of old shard count";
200+
assert sourceShardStates.length == oldShardCount : "Must be one source shard state for each old shard";
201+
assert targetShardStates.length == newShardCount - oldShardCount : "Must be one target shard state for each new shard";
202+
203+
this.oldShardCount = oldShardCount;
204+
this.newShardCount = newShardCount;
205+
this.sourceShardStates = sourceShardStates;
206+
this.targetShardStates = targetShardStates;
207+
}
208+
209+
// can't use record implementation because we need a deep comparison of targetShardStates
210+
@Override
211+
public boolean equals(Object other) {
212+
if (this == other) {
213+
return true;
214+
}
215+
if (other == null || getClass() != other.getClass()) {
216+
return false;
217+
}
218+
IndexReshardingMetadata otherMetadata = (IndexReshardingMetadata) other;
219+
return oldShardCount == otherMetadata.oldShardCount
220+
&& newShardCount == otherMetadata.newShardCount
221+
&& Arrays.equals(sourceShardStates, otherMetadata.sourceShardStates)
222+
&& Arrays.equals(targetShardStates, otherMetadata.targetShardStates);
223+
}
224+
225+
@Override
226+
public int hashCode() {
227+
return Objects.hash(oldShardCount, newShardCount, Arrays.hashCode(sourceShardStates), Arrays.hashCode(targetShardStates));
228+
}
229+
230+
public void setTargetShardState(int shard, TargetShardState shardState) {
231+
targetShardStates[shard] = shardState;
232+
}
233+
234+
public TargetShardState getTargetShardState(int shard) {
235+
return targetShardStates[shard];
236+
}
237+
238+
private static SourceShardState[] initialSourceShardStates(int sourceShardCount) {
239+
SourceShardState[] sourceShardStates = new SourceShardState[sourceShardCount];
240+
Arrays.fill(sourceShardStates, SourceShardState.SOURCE);
241+
return sourceShardStates;
242+
}
243+
244+
private static TargetShardState[] initialTargetShardStates(int targetShardCount) {
245+
TargetShardState[] targetShardStates = new TargetShardState[targetShardCount];
246+
Arrays.fill(targetShardStates, TargetShardState.CLONE);
247+
return targetShardStates;
248+
}
249+
}

0 commit comments

Comments
 (0)